| # |
| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| import itertools |
| import numpy as np |
| |
| from pyspark import since, keyword_only |
| from pyspark.ml import Estimator, Model |
| from pyspark.ml.param import Params, Param, TypeConverters |
| from pyspark.ml.param.shared import HasSeed |
| from pyspark.sql.functions import rand |
| |
| __all__ = ['ParamGridBuilder', 'CrossValidator', 'CrossValidatorModel', 'TrainValidationSplit', |
| 'TrainValidationSplitModel'] |
| |
| |
| class ParamGridBuilder(object): |
| r""" |
| Builder for a param grid used in grid search-based model selection. |
| |
| >>> from pyspark.ml.classification import LogisticRegression |
| >>> lr = LogisticRegression() |
| >>> output = ParamGridBuilder() \ |
| ... .baseOn({lr.labelCol: 'l'}) \ |
| ... .baseOn([lr.predictionCol, 'p']) \ |
| ... .addGrid(lr.regParam, [1.0, 2.0]) \ |
| ... .addGrid(lr.maxIter, [1, 5]) \ |
| ... .build() |
| >>> expected = [ |
| ... {lr.regParam: 1.0, lr.maxIter: 1, lr.labelCol: 'l', lr.predictionCol: 'p'}, |
| ... {lr.regParam: 2.0, lr.maxIter: 1, lr.labelCol: 'l', lr.predictionCol: 'p'}, |
| ... {lr.regParam: 1.0, lr.maxIter: 5, lr.labelCol: 'l', lr.predictionCol: 'p'}, |
| ... {lr.regParam: 2.0, lr.maxIter: 5, lr.labelCol: 'l', lr.predictionCol: 'p'}] |
| >>> len(output) == len(expected) |
| True |
| >>> all([m in expected for m in output]) |
| True |
| |
| .. versionadded:: 1.4.0 |
| """ |
| |
| def __init__(self): |
| self._param_grid = {} |
| |
| @since("1.4.0") |
| def addGrid(self, param, values): |
| """ |
| Sets the given parameters in this grid to fixed values. |
| """ |
| self._param_grid[param] = values |
| |
| return self |
| |
| @since("1.4.0") |
| def baseOn(self, *args): |
| """ |
| Sets the given parameters in this grid to fixed values. |
| Accepts either a parameter dictionary or a list of (parameter, value) pairs. |
| """ |
| if isinstance(args[0], dict): |
| self.baseOn(*args[0].items()) |
| else: |
| for (param, value) in args: |
| self.addGrid(param, [value]) |
| |
| return self |
| |
| @since("1.4.0") |
| def build(self): |
| """ |
| Builds and returns all combinations of parameters specified |
| by the param grid. |
| """ |
| keys = self._param_grid.keys() |
| grid_values = self._param_grid.values() |
| return [dict(zip(keys, prod)) for prod in itertools.product(*grid_values)] |
| |
| |
| class ValidatorParams(HasSeed): |
| """ |
| Common params for TrainValidationSplit and CrossValidator. |
| """ |
| |
| estimator = Param(Params._dummy(), "estimator", "estimator to be cross-validated") |
| estimatorParamMaps = Param(Params._dummy(), "estimatorParamMaps", "estimator param maps") |
| evaluator = Param( |
| Params._dummy(), "evaluator", |
| "evaluator used to select hyper-parameters that maximize the validator metric") |
| |
| def setEstimator(self, value): |
| """ |
| Sets the value of :py:attr:`estimator`. |
| """ |
| return self._set(estimator=value) |
| |
| def getEstimator(self): |
| """ |
| Gets the value of estimator or its default value. |
| """ |
| return self.getOrDefault(self.estimator) |
| |
| def setEstimatorParamMaps(self, value): |
| """ |
| Sets the value of :py:attr:`estimatorParamMaps`. |
| """ |
| return self._set(estimatorParamMaps=value) |
| |
| def getEstimatorParamMaps(self): |
| """ |
| Gets the value of estimatorParamMaps or its default value. |
| """ |
| return self.getOrDefault(self.estimatorParamMaps) |
| |
| def setEvaluator(self, value): |
| """ |
| Sets the value of :py:attr:`evaluator`. |
| """ |
| return self._set(evaluator=value) |
| |
| def getEvaluator(self): |
| """ |
| Gets the value of evaluator or its default value. |
| """ |
| return self.getOrDefault(self.evaluator) |
| |
| |
| class CrossValidator(Estimator, ValidatorParams): |
| """ |
| |
| K-fold cross validation performs model selection by splitting the dataset into a set of |
| non-overlapping randomly partitioned folds which are used as separate training and test datasets |
| e.g., with k=3 folds, K-fold cross validation will generate 3 (training, test) dataset pairs, |
| each of which uses 2/3 of the data for training and 1/3 for testing. Each fold is used as the |
| test set exactly once. |
| |
| |
| >>> from pyspark.ml.classification import LogisticRegression |
| >>> from pyspark.ml.evaluation import BinaryClassificationEvaluator |
| >>> from pyspark.ml.linalg import Vectors |
| >>> dataset = spark.createDataFrame( |
| ... [(Vectors.dense([0.0]), 0.0), |
| ... (Vectors.dense([0.4]), 1.0), |
| ... (Vectors.dense([0.5]), 0.0), |
| ... (Vectors.dense([0.6]), 1.0), |
| ... (Vectors.dense([1.0]), 1.0)] * 10, |
| ... ["features", "label"]) |
| >>> lr = LogisticRegression() |
| >>> grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build() |
| >>> evaluator = BinaryClassificationEvaluator() |
| >>> cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator) |
| >>> cvModel = cv.fit(dataset) |
| >>> cvModel.avgMetrics[0] |
| 0.5 |
| >>> evaluator.evaluate(cvModel.transform(dataset)) |
| 0.8333... |
| |
| .. versionadded:: 1.4.0 |
| """ |
| |
| numFolds = Param(Params._dummy(), "numFolds", "number of folds for cross validation", |
| typeConverter=TypeConverters.toInt) |
| |
| @keyword_only |
| def __init__(self, estimator=None, estimatorParamMaps=None, evaluator=None, numFolds=3, |
| seed=None): |
| """ |
| __init__(self, estimator=None, estimatorParamMaps=None, evaluator=None, numFolds=3,\ |
| seed=None) |
| """ |
| super(CrossValidator, self).__init__() |
| self._setDefault(numFolds=3) |
| kwargs = self._input_kwargs |
| self._set(**kwargs) |
| |
| @keyword_only |
| @since("1.4.0") |
| def setParams(self, estimator=None, estimatorParamMaps=None, evaluator=None, numFolds=3, |
| seed=None): |
| """ |
| setParams(self, estimator=None, estimatorParamMaps=None, evaluator=None, numFolds=3,\ |
| seed=None): |
| Sets params for cross validator. |
| """ |
| kwargs = self._input_kwargs |
| return self._set(**kwargs) |
| |
| @since("1.4.0") |
| def setNumFolds(self, value): |
| """ |
| Sets the value of :py:attr:`numFolds`. |
| """ |
| return self._set(numFolds=value) |
| |
| @since("1.4.0") |
| def getNumFolds(self): |
| """ |
| Gets the value of numFolds or its default value. |
| """ |
| return self.getOrDefault(self.numFolds) |
| |
| def _fit(self, dataset): |
| est = self.getOrDefault(self.estimator) |
| epm = self.getOrDefault(self.estimatorParamMaps) |
| numModels = len(epm) |
| eva = self.getOrDefault(self.evaluator) |
| nFolds = self.getOrDefault(self.numFolds) |
| seed = self.getOrDefault(self.seed) |
| h = 1.0 / nFolds |
| randCol = self.uid + "_rand" |
| df = dataset.select("*", rand(seed).alias(randCol)) |
| metrics = [0.0] * numModels |
| for i in range(nFolds): |
| validateLB = i * h |
| validateUB = (i + 1) * h |
| condition = (df[randCol] >= validateLB) & (df[randCol] < validateUB) |
| validation = df.filter(condition) |
| train = df.filter(~condition) |
| models = est.fit(train, epm) |
| for j in range(numModels): |
| model = models[j] |
| # TODO: duplicate evaluator to take extra params from input |
| metric = eva.evaluate(model.transform(validation, epm[j])) |
| metrics[j] += metric/nFolds |
| |
| if eva.isLargerBetter(): |
| bestIndex = np.argmax(metrics) |
| else: |
| bestIndex = np.argmin(metrics) |
| bestModel = est.fit(dataset, epm[bestIndex]) |
| return self._copyValues(CrossValidatorModel(bestModel, metrics)) |
| |
| @since("1.4.0") |
| def copy(self, extra=None): |
| """ |
| Creates a copy of this instance with a randomly generated uid |
| and some extra params. This copies creates a deep copy of |
| the embedded paramMap, and copies the embedded and extra parameters over. |
| |
| :param extra: Extra parameters to copy to the new instance |
| :return: Copy of this instance |
| """ |
| if extra is None: |
| extra = dict() |
| newCV = Params.copy(self, extra) |
| if self.isSet(self.estimator): |
| newCV.setEstimator(self.getEstimator().copy(extra)) |
| # estimatorParamMaps remain the same |
| if self.isSet(self.evaluator): |
| newCV.setEvaluator(self.getEvaluator().copy(extra)) |
| return newCV |
| |
| |
| class CrossValidatorModel(Model, ValidatorParams): |
| """ |
| |
| CrossValidatorModel contains the model with the highest average cross-validation |
| metric across folds and uses this model to transform input data. CrossValidatorModel |
| also tracks the metrics for each param map evaluated. |
| |
| .. versionadded:: 1.4.0 |
| """ |
| |
| def __init__(self, bestModel, avgMetrics=[]): |
| super(CrossValidatorModel, self).__init__() |
| #: best model from cross validation |
| self.bestModel = bestModel |
| #: Average cross-validation metrics for each paramMap in |
| #: CrossValidator.estimatorParamMaps, in the corresponding order. |
| self.avgMetrics = avgMetrics |
| |
| def _transform(self, dataset): |
| return self.bestModel.transform(dataset) |
| |
| @since("1.4.0") |
| def copy(self, extra=None): |
| """ |
| Creates a copy of this instance with a randomly generated uid |
| and some extra params. This copies the underlying bestModel, |
| creates a deep copy of the embedded paramMap, and |
| copies the embedded and extra parameters over. |
| |
| :param extra: Extra parameters to copy to the new instance |
| :return: Copy of this instance |
| """ |
| if extra is None: |
| extra = dict() |
| bestModel = self.bestModel.copy(extra) |
| avgMetrics = self.avgMetrics |
| return CrossValidatorModel(bestModel, avgMetrics) |
| |
| |
| class TrainValidationSplit(Estimator, ValidatorParams): |
| """ |
| .. note:: Experimental |
| |
| Validation for hyper-parameter tuning. Randomly splits the input dataset into train and |
| validation sets, and uses evaluation metric on the validation set to select the best model. |
| Similar to :class:`CrossValidator`, but only splits the set once. |
| |
| >>> from pyspark.ml.classification import LogisticRegression |
| >>> from pyspark.ml.evaluation import BinaryClassificationEvaluator |
| >>> from pyspark.ml.linalg import Vectors |
| >>> dataset = spark.createDataFrame( |
| ... [(Vectors.dense([0.0]), 0.0), |
| ... (Vectors.dense([0.4]), 1.0), |
| ... (Vectors.dense([0.5]), 0.0), |
| ... (Vectors.dense([0.6]), 1.0), |
| ... (Vectors.dense([1.0]), 1.0)] * 10, |
| ... ["features", "label"]) |
| >>> lr = LogisticRegression() |
| >>> grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build() |
| >>> evaluator = BinaryClassificationEvaluator() |
| >>> tvs = TrainValidationSplit(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator) |
| >>> tvsModel = tvs.fit(dataset) |
| >>> evaluator.evaluate(tvsModel.transform(dataset)) |
| 0.8333... |
| |
| .. versionadded:: 2.0.0 |
| """ |
| |
| trainRatio = Param(Params._dummy(), "trainRatio", "Param for ratio between train and\ |
| validation data. Must be between 0 and 1.", typeConverter=TypeConverters.toFloat) |
| |
| @keyword_only |
| def __init__(self, estimator=None, estimatorParamMaps=None, evaluator=None, trainRatio=0.75, |
| seed=None): |
| """ |
| __init__(self, estimator=None, estimatorParamMaps=None, evaluator=None, trainRatio=0.75,\ |
| seed=None) |
| """ |
| super(TrainValidationSplit, self).__init__() |
| self._setDefault(trainRatio=0.75) |
| kwargs = self._input_kwargs |
| self._set(**kwargs) |
| |
| @since("2.0.0") |
| @keyword_only |
| def setParams(self, estimator=None, estimatorParamMaps=None, evaluator=None, trainRatio=0.75, |
| seed=None): |
| """ |
| setParams(self, estimator=None, estimatorParamMaps=None, evaluator=None, trainRatio=0.75,\ |
| seed=None): |
| Sets params for the train validation split. |
| """ |
| kwargs = self._input_kwargs |
| return self._set(**kwargs) |
| |
| @since("2.0.0") |
| def setTrainRatio(self, value): |
| """ |
| Sets the value of :py:attr:`trainRatio`. |
| """ |
| return self._set(trainRatio=value) |
| |
| @since("2.0.0") |
| def getTrainRatio(self): |
| """ |
| Gets the value of trainRatio or its default value. |
| """ |
| return self.getOrDefault(self.trainRatio) |
| |
| def _fit(self, dataset): |
| est = self.getOrDefault(self.estimator) |
| epm = self.getOrDefault(self.estimatorParamMaps) |
| numModels = len(epm) |
| eva = self.getOrDefault(self.evaluator) |
| tRatio = self.getOrDefault(self.trainRatio) |
| seed = self.getOrDefault(self.seed) |
| randCol = self.uid + "_rand" |
| df = dataset.select("*", rand(seed).alias(randCol)) |
| metrics = [0.0] * numModels |
| condition = (df[randCol] >= tRatio) |
| validation = df.filter(condition) |
| train = df.filter(~condition) |
| models = est.fit(train, epm) |
| for j in range(numModels): |
| model = models[j] |
| metric = eva.evaluate(model.transform(validation, epm[j])) |
| metrics[j] += metric |
| if eva.isLargerBetter(): |
| bestIndex = np.argmax(metrics) |
| else: |
| bestIndex = np.argmin(metrics) |
| bestModel = est.fit(dataset, epm[bestIndex]) |
| return self._copyValues(TrainValidationSplitModel(bestModel, metrics)) |
| |
| @since("2.0.0") |
| def copy(self, extra=None): |
| """ |
| Creates a copy of this instance with a randomly generated uid |
| and some extra params. This copies creates a deep copy of |
| the embedded paramMap, and copies the embedded and extra parameters over. |
| |
| :param extra: Extra parameters to copy to the new instance |
| :return: Copy of this instance |
| """ |
| if extra is None: |
| extra = dict() |
| newTVS = Params.copy(self, extra) |
| if self.isSet(self.estimator): |
| newTVS.setEstimator(self.getEstimator().copy(extra)) |
| # estimatorParamMaps remain the same |
| if self.isSet(self.evaluator): |
| newTVS.setEvaluator(self.getEvaluator().copy(extra)) |
| return newTVS |
| |
| |
| class TrainValidationSplitModel(Model, ValidatorParams): |
| """ |
| .. note:: Experimental |
| |
| Model from train validation split. |
| |
| .. versionadded:: 2.0.0 |
| """ |
| |
| def __init__(self, bestModel, validationMetrics=[]): |
| super(TrainValidationSplitModel, self).__init__() |
| #: best model from cross validation |
| self.bestModel = bestModel |
| #: evaluated validation metrics |
| self.validationMetrics = validationMetrics |
| |
| def _transform(self, dataset): |
| return self.bestModel.transform(dataset) |
| |
| @since("2.0.0") |
| def copy(self, extra=None): |
| """ |
| Creates a copy of this instance with a randomly generated uid |
| and some extra params. This copies the underlying bestModel, |
| creates a deep copy of the embedded paramMap, and |
| copies the embedded and extra parameters over. |
| And, this creates a shallow copy of the validationMetrics. |
| |
| :param extra: Extra parameters to copy to the new instance |
| :return: Copy of this instance |
| """ |
| if extra is None: |
| extra = dict() |
| bestModel = self.bestModel.copy(extra) |
| validationMetrics = list(self.validationMetrics) |
| return TrainValidationSplitModel(bestModel, validationMetrics) |
| |
| |
| if __name__ == "__main__": |
| import doctest |
| |
| from pyspark.sql import SparkSession |
| globs = globals().copy() |
| |
| # The small batch size here ensures that we see multiple batches, |
| # even in these small test examples: |
| spark = SparkSession.builder\ |
| .master("local[2]")\ |
| .appName("ml.tuning tests")\ |
| .getOrCreate() |
| sc = spark.sparkContext |
| globs['sc'] = sc |
| globs['spark'] = spark |
| (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) |
| spark.stop() |
| if failure_count: |
| exit(-1) |