blob: 5c793dc344c7b1f78d2cd34671b5991d449d262e [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import tempfile
import unittest
from shutil import rmtree
import numpy as np
from pyspark.ml.linalg import Vectors, Matrices
from pyspark.sql import DataFrame, Row
from pyspark.ml.classification import (
NaiveBayes,
NaiveBayesModel,
LinearSVC,
LinearSVCModel,
LinearSVCSummary,
LinearSVCTrainingSummary,
LogisticRegression,
LogisticRegressionModel,
LogisticRegressionSummary,
BinaryLogisticRegressionSummary,
FMClassifier,
FMClassificationModel,
FMClassificationSummary,
FMClassificationTrainingSummary,
DecisionTreeClassifier,
DecisionTreeClassificationModel,
RandomForestClassifier,
RandomForestClassificationModel,
RandomForestClassificationSummary,
RandomForestClassificationTrainingSummary,
BinaryRandomForestClassificationSummary,
BinaryRandomForestClassificationTrainingSummary,
GBTClassifier,
GBTClassificationModel,
MultilayerPerceptronClassifier,
MultilayerPerceptronClassificationModel,
MultilayerPerceptronClassificationSummary,
MultilayerPerceptronClassificationTrainingSummary,
)
from pyspark.ml.regression import DecisionTreeRegressionModel
from pyspark.testing.sqlutils import ReusedSQLTestCase
class ClassificationTestsMixin:
def test_naive_bayes(self):
spark = self.spark
df = spark.createDataFrame(
[
Row(label=0.0, weight=0.1, features=Vectors.dense([0.0, 0.0])),
Row(label=0.0, weight=0.5, features=Vectors.dense([0.0, 1.0])),
Row(label=1.0, weight=1.0, features=Vectors.dense([1.0, 0.0])),
]
)
nb = NaiveBayes(smoothing=1.0, modelType="multinomial", weightCol="weight")
self.assertEqual(nb.getSmoothing(), 1.0)
self.assertEqual(nb.getModelType(), "multinomial")
self.assertEqual(nb.getWeightCol(), "weight")
model = nb.fit(df)
self.assertEqual(model.uid, nb.uid)
self.assertEqual(model.numClasses, 2)
self.assertEqual(model.numFeatures, 2)
self.assertTrue(
np.allclose(model.pi.toArray(), [-0.81093022, -0.58778666], atol=1e-4), model.pi
)
self.assertTrue(
np.allclose(
model.theta.toArray(),
[[-0.91629073, -0.51082562], [-0.40546511, -1.09861229]],
atol=1e-4,
),
model.theta,
)
self.assertTrue(np.allclose(model.sigma.toArray(), [], atol=1e-4), model.sigma)
vec = Vectors.dense(0.0, 5.0)
self.assertEqual(model.predict(vec), 0.0)
pred = model.predictRaw(vec)
self.assertTrue(np.allclose(pred.toArray(), [-3.36505834, -6.08084811], atol=1e-4), pred)
pred = model.predictProbability(vec)
self.assertTrue(np.allclose(pred.toArray(), [0.93795196, 0.06204804], atol=1e-4), pred)
output = model.transform(df)
expected_cols = [
"label",
"weight",
"features",
"rawPrediction",
"probability",
"prediction",
]
self.assertEqual(output.columns, expected_cols)
self.assertEqual(output.count(), 3)
# Model save & load
with tempfile.TemporaryDirectory(prefix="naive_bayes") as d:
nb.write().overwrite().save(d)
nb2 = NaiveBayes.load(d)
self.assertEqual(str(nb), str(nb2))
model.write().overwrite().save(d)
model2 = NaiveBayesModel.load(d)
self.assertEqual(str(model), str(model2))
def test_binomial_logistic_regression_with_bound(self):
df = self.spark.createDataFrame(
[
(1.0, 1.0, Vectors.dense(0.0, 5.0)),
(0.0, 2.0, Vectors.dense(1.0, 2.0)),
(1.0, 3.0, Vectors.dense(2.0, 1.0)),
(0.0, 4.0, Vectors.dense(3.0, 3.0)),
],
["label", "weight", "features"],
)
lor = LogisticRegression(
regParam=0.01,
weightCol="weight",
lowerBoundsOnCoefficients=Matrices.dense(1, 2, [-1.0, -1.0]),
upperBoundsOnIntercepts=Vectors.dense(0.0),
)
lor_model = lor.fit(df)
self.assertEqual(lor.uid, lor_model.uid)
def check_result(model: LogisticRegressionModel) -> None:
self.assertTrue(
np.allclose(model.coefficients.toArray(), [-0.2944, -0.0484], atol=1e-4)
)
self.assertTrue(np.isclose(model.intercept, 0.0, atol=1e-4))
check_result(lor_model)
with tempfile.TemporaryDirectory(prefix="blr") as tmp_dir:
lor.write().overwrite().save(tmp_dir)
lor2 = LogisticRegression.load(tmp_dir)
self.assertEqual(str(lor), str(lor2))
self.assertEqual(lor.getRegParam(), 0.01)
self.assertEqual(lor2.getRegParam(), 0.01)
lor_model.write().overwrite().save(tmp_dir)
loaded_model = LogisticRegressionModel.load(tmp_dir)
self.assertEqual(str(lor_model), str(loaded_model))
self.assertEqual(lor_model.getRegParam(), 0.01)
self.assertEqual(loaded_model.getRegParam(), 0.01)
check_result(loaded_model)
def test_multinomial_logistic_regression_with_bound(self):
data_path = "data/mllib/sample_multiclass_classification_data.txt"
df = self.spark.read.format("libsvm").load(data_path)
lor = LogisticRegression(
regParam=0.01,
lowerBoundsOnCoefficients=Matrices.dense(3, 4, range(12)),
upperBoundsOnIntercepts=Vectors.dense(0.0, 0.0, 0.0),
)
lor_model = lor.fit(df)
self.assertEqual(lor.uid, lor_model.uid)
def check_result(model: LogisticRegressionModel) -> None:
expected = [
[4.593, 4.5516, 9.0099, 12.2904],
[1.0, 8.1093, 7.0, 10.0],
[3.041, 5.0, 8.0, 11.0],
]
for i in range(0, len(expected)):
self.assertTrue(
np.allclose(model.coefficientMatrix.toArray()[i], expected[i], atol=1e-4)
)
self.assertTrue(
np.allclose(model.interceptVector.toArray(), [-0.9057, -1.1392, -0.0033], atol=1e-4)
)
check_result(lor_model)
# test_binomial_logistic_regression_with_bound has covered the read/write for LR
with tempfile.TemporaryDirectory(prefix="mlr") as tmp_dir:
lor_model.write().overwrite().save(tmp_dir)
loaded_model = LogisticRegressionModel.load(tmp_dir)
self.assertEqual(str(lor_model), str(loaded_model))
check_result(loaded_model)
def test_logistic_regression_with_threshold(self):
df = self.spark.createDataFrame(
[
(1.0, 1.0, Vectors.dense(0.0, 5.0)),
(0.0, 2.0, Vectors.dense(1.0, 2.0)),
(1.0, 3.0, Vectors.dense(2.0, 1.0)),
(0.0, 4.0, Vectors.dense(3.0, 3.0)),
],
["label", "weight", "features"],
)
lor = LogisticRegression(weightCol="weight")
model = lor.fit(df)
self.assertEqual(lor.uid, model.uid)
# status changes 1
for t in [0.0, 0.1, 0.2, 0.5, 1.0]:
model.setThreshold(t).transform(df)
# status changes 2
[model.setThreshold(t).predict(Vectors.dense(0.0, 5.0)) for t in [0.0, 0.1, 0.2, 0.5, 1.0]]
self.assertEqual(
[row.prediction for row in model.setThreshold(0.0).transform(df).collect()],
[1.0, 1.0, 1.0, 1.0],
)
self.assertEqual(
[row.prediction for row in model.setThreshold(0.5).transform(df).collect()],
[0.0, 1.0, 1.0, 0.0],
)
self.assertEqual(
[row.prediction for row in model.setThreshold(1.0).transform(df).collect()],
[0.0, 0.0, 0.0, 0.0],
)
def test_binary_logistic_regression_summary(self):
df = self.spark.createDataFrame(
[(1.0, 2.0, Vectors.dense(1.0)), (0.0, 2.0, Vectors.sparse(1, [], []))],
["label", "weight", "features"],
)
lr = LogisticRegression(maxIter=5, regParam=0.01, weightCol="weight", fitIntercept=False)
model = lr.fit(df)
self.assertEqual(lr.uid, model.uid)
self.assertTrue(model.hasSummary)
s = model.summary
# test that api is callable and returns expected types
self.assertTrue(isinstance(s.predictions, DataFrame))
self.assertEqual(s.probabilityCol, "probability")
self.assertEqual(s.labelCol, "label")
self.assertEqual(s.featuresCol, "features")
self.assertEqual(s.predictionCol, "prediction")
objHist = s.objectiveHistory
self.assertTrue(isinstance(objHist, list) and isinstance(objHist[0], float))
self.assertGreater(s.totalIterations, 0)
self.assertTrue(isinstance(s.labels, list))
self.assertTrue(isinstance(s.truePositiveRateByLabel, list))
self.assertTrue(isinstance(s.falsePositiveRateByLabel, list))
self.assertTrue(isinstance(s.precisionByLabel, list))
self.assertTrue(isinstance(s.recallByLabel, list))
self.assertTrue(isinstance(s.fMeasureByLabel(), list))
self.assertTrue(isinstance(s.fMeasureByLabel(1.0), list))
self.assertTrue(isinstance(s.roc, DataFrame))
self.assertAlmostEqual(s.areaUnderROC, 1.0, 2)
self.assertTrue(isinstance(s.pr, DataFrame))
self.assertTrue(isinstance(s.fMeasureByThreshold, DataFrame))
self.assertTrue(isinstance(s.precisionByThreshold, DataFrame))
self.assertTrue(isinstance(s.recallByThreshold, DataFrame))
self.assertAlmostEqual(s.accuracy, 1.0, 2)
self.assertAlmostEqual(s.weightedTruePositiveRate, 1.0, 2)
self.assertAlmostEqual(s.weightedFalsePositiveRate, 0.0, 2)
self.assertAlmostEqual(s.weightedRecall, 1.0, 2)
self.assertAlmostEqual(s.weightedPrecision, 1.0, 2)
self.assertAlmostEqual(s.weightedFMeasure(), 1.0, 2)
self.assertAlmostEqual(s.weightedFMeasure(1.0), 1.0, 2)
# test evaluation (with training dataset) produces a summary with same values
# one check is enough to verify a summary is returned, Scala version runs full test
sameSummary = model.evaluate(df)
self.assertTrue(isinstance(sameSummary, BinaryLogisticRegressionSummary))
self.assertAlmostEqual(sameSummary.areaUnderROC, s.areaUnderROC)
self.assertEqual(sorted(sameSummary.predictions.collect()), sorted(s.predictions.collect()))
def test_multiclass_logistic_regression_summary(self):
df = self.spark.createDataFrame(
[
(1.0, 2.0, Vectors.dense(1.0)),
(0.0, 2.0, Vectors.sparse(1, [], [])),
(2.0, 2.0, Vectors.dense(2.0)),
(2.0, 2.0, Vectors.dense(1.9)),
],
["label", "weight", "features"],
)
lr = LogisticRegression(maxIter=5, regParam=0.01, weightCol="weight", fitIntercept=False)
model = lr.fit(df)
self.assertTrue(model.hasSummary)
s = model.summary
# test that api is callable and returns expected types
self.assertTrue(isinstance(s.predictions, DataFrame))
self.assertEqual(s.probabilityCol, "probability")
self.assertEqual(s.labelCol, "label")
self.assertEqual(s.featuresCol, "features")
self.assertEqual(s.predictionCol, "prediction")
objHist = s.objectiveHistory
self.assertTrue(isinstance(objHist, list) and isinstance(objHist[0], float))
self.assertGreater(s.totalIterations, 0)
self.assertTrue(isinstance(s.labels, list))
self.assertTrue(isinstance(s.truePositiveRateByLabel, list))
self.assertTrue(isinstance(s.falsePositiveRateByLabel, list))
self.assertTrue(isinstance(s.precisionByLabel, list))
self.assertTrue(isinstance(s.recallByLabel, list))
self.assertTrue(isinstance(s.fMeasureByLabel(), list))
self.assertTrue(isinstance(s.fMeasureByLabel(1.0), list))
self.assertAlmostEqual(s.accuracy, 0.75, 2)
self.assertAlmostEqual(s.weightedTruePositiveRate, 0.75, 2)
self.assertAlmostEqual(s.weightedFalsePositiveRate, 0.25, 2)
self.assertAlmostEqual(s.weightedRecall, 0.75, 2)
self.assertAlmostEqual(s.weightedPrecision, 0.583, 2)
self.assertAlmostEqual(s.weightedFMeasure(), 0.65, 2)
self.assertAlmostEqual(s.weightedFMeasure(1.0), 0.65, 2)
# test evaluation (with training dataset) produces a summary with same values
# one check is enough to verify a summary is returned, Scala version runs full test
sameSummary = model.evaluate(df)
self.assertTrue(isinstance(sameSummary, LogisticRegressionSummary))
self.assertFalse(isinstance(sameSummary, BinaryLogisticRegressionSummary))
self.assertAlmostEqual(sameSummary.accuracy, s.accuracy)
# We can't use sorted(s.predictions.collect()), since the DenseVector doesn't support "<"
self.assertEqual(
sameSummary.predictions.coalesce(1).sort("label", "weight", "prediction").collect(),
s.predictions.coalesce(1).sort("label", "weight", "prediction").collect(),
)
def test_logistic_regression(self):
# test sparse/dense vector and matrix
lower_intercepts = Vectors.dense([1, 2, 3, 4])
upper_intercepts = Vectors.sparse(4, [(1, 1.0), (3, 5.5)])
lower_coefficients = Matrices.dense(3, 2, [0, 1, 4, 5, 9, 10])
upper_coefficients = Matrices.sparse(1, 1, [0, 1], [0], [2.0])
lr = LogisticRegression(
maxIter=1,
lowerBoundsOnIntercepts=lower_intercepts,
upperBoundsOnIntercepts=upper_intercepts,
lowerBoundsOnCoefficients=lower_coefficients,
upperBoundsOnCoefficients=upper_coefficients,
)
path = tempfile.mkdtemp()
lr_path = path + "/logreg"
lr.save(lr_path)
lr2 = LogisticRegression.load(lr_path)
self.assertEqual(
lr2.uid,
lr2.maxIter.parent,
"Loaded LogisticRegression instance uid (%s) "
"did not match Param's uid (%s)" % (lr2.uid, lr2.maxIter.parent),
)
self.assertEqual(
lr._defaultParamMap[lr.maxIter],
lr2._defaultParamMap[lr2.maxIter],
"Loaded LogisticRegression instance default params did not match "
+ "original defaults",
)
self.assertEqual(
lr.getLowerBoundsOnIntercepts(),
lr2.getLowerBoundsOnIntercepts(),
)
self.assertEqual(
lr.getUpperBoundsOnIntercepts(),
lr2.getUpperBoundsOnIntercepts(),
)
self.assertEqual(
lr.getLowerBoundsOnCoefficients(),
lr2.getLowerBoundsOnCoefficients(),
)
self.assertEqual(
lr.getUpperBoundsOnCoefficients(),
lr2.getUpperBoundsOnCoefficients(),
)
try:
rmtree(path)
except OSError:
pass
def test_linear_svc(self):
df = (
self.spark.createDataFrame(
[
(1.0, 1.0, Vectors.dense(0.0, 5.0)),
(0.0, 2.0, Vectors.dense(1.0, 2.0)),
(1.0, 3.0, Vectors.dense(2.0, 1.0)),
(0.0, 4.0, Vectors.dense(3.0, 3.0)),
],
["label", "weight", "features"],
)
.coalesce(1)
.sortWithinPartitions("weight")
)
svc = LinearSVC(maxIter=1, regParam=1.0)
self.assertEqual(svc.getMaxIter(), 1)
self.assertEqual(svc.getRegParam(), 1.0)
model = svc.fit(df)
self.assertEqual(svc.uid, model.uid)
self.assertEqual(model.numClasses, 2)
self.assertEqual(model.numFeatures, 2)
self.assertTrue(np.allclose(model.intercept, 0.025877458475338313, atol=1e-4))
self.assertTrue(
np.allclose(model.coefficients.toArray(), [-0.03622844, 0.01035098], atol=1e-4)
)
vec = Vectors.dense(0.0, 5.0)
self.assertEqual(model.predict(vec), 1.0)
self.assertTrue(
np.allclose(model.predictRaw(vec).toArray(), [-0.07763238, 0.07763238], atol=1e-4)
)
output = model.transform(df)
expected_cols = [
"label",
"weight",
"features",
"rawPrediction",
"prediction",
]
self.assertEqual(output.columns, expected_cols)
self.assertEqual(output.count(), 4)
# model summary
self.assertTrue(model.hasSummary)
summary = model.summary()
self.assertIsInstance(summary, LinearSVCSummary)
self.assertIsInstance(summary, LinearSVCTrainingSummary)
self.assertEqual(summary.labels, [0.0, 1.0])
self.assertEqual(summary.accuracy, 0.5)
self.assertEqual(summary.areaUnderROC, 0.75)
self.assertEqual(summary.predictions.columns, expected_cols)
summary2 = model.evaluate(df)
self.assertIsInstance(summary2, LinearSVCSummary)
self.assertFalse(isinstance(summary2, LinearSVCTrainingSummary))
self.assertEqual(summary2.labels, [0.0, 1.0])
self.assertEqual(summary2.accuracy, 0.5)
self.assertEqual(summary2.areaUnderROC, 0.75)
self.assertEqual(summary2.predictions.columns, expected_cols)
# Model save & load
with tempfile.TemporaryDirectory(prefix="linear_svc") as d:
svc.write().overwrite().save(d)
svc2 = LinearSVC.load(d)
self.assertEqual(str(svc), str(svc2))
model.write().overwrite().save(d)
model2 = LinearSVCModel.load(d)
self.assertEqual(str(model), str(model2))
def test_factorization_machine(self):
spark = self.spark
df = (
spark.createDataFrame(
[
(1.0, 1.0, Vectors.dense(0.0, 5.0)),
(0.0, 2.0, Vectors.dense(1.0, 2.0)),
(1.0, 3.0, Vectors.dense(2.0, 1.0)),
(0.0, 4.0, Vectors.dense(3.0, 3.0)),
],
["label", "weight", "features"],
)
.coalesce(1)
.sortWithinPartitions("weight")
)
fm = FMClassifier(factorSize=2, maxIter=1, regParam=1.0, seed=1)
self.assertEqual(fm.getFactorSize(), 2)
self.assertEqual(fm.getMaxIter(), 1)
self.assertEqual(fm.getRegParam(), 1.0)
self.assertEqual(fm.getSeed(), 1)
model = fm.fit(df)
self.assertEqual(fm.uid, model.uid)
self.assertEqual(model.numClasses, 2)
self.assertEqual(model.numFeatures, 2)
self.assertTrue(
np.allclose(model.intercept, 0.9999070647126924, atol=1e-4), model.intercept
)
self.assertTrue(
np.allclose(
model.linear.toArray(), [-0.999999959956255, 0.9999999201744205], atol=1e-4
),
model.linear,
)
self.assertTrue(
np.allclose(
model.factors.toArray(),
[[0.99999918, 0.99999858], [-0.99999943, 0.99999854]],
atol=1e-4,
),
model.factors,
)
vec = Vectors.dense(0.0, 5.0)
pred = model.predict(vec)
self.assertEqual(pred, 1.0)
pred = model.predictRaw(vec)
self.assertTrue(
np.allclose(pred.toArray(), [-5.9999066655847955, 5.9999066655847955], atol=1e-4),
pred,
)
pred = model.predictProbability(vec)
self.assertTrue(
np.allclose(pred.toArray(), [0.002472853377527451, 0.9975271466224725], atol=1e-4),
pred,
)
output = model.transform(df)
expected_cols = [
"label",
"weight",
"features",
"rawPrediction",
"probability",
"prediction",
]
self.assertEqual(output.columns, expected_cols)
self.assertEqual(output.count(), 4)
# model summary
self.assertTrue(model.hasSummary)
summary = model.summary()
self.assertIsInstance(summary, FMClassificationSummary)
self.assertIsInstance(summary, FMClassificationTrainingSummary)
self.assertEqual(summary.labels, [0.0, 1.0])
self.assertEqual(summary.accuracy, 0.25)
self.assertEqual(summary.areaUnderROC, 0.5)
self.assertEqual(summary.predictions.columns, expected_cols)
summary2 = model.evaluate(df)
self.assertIsInstance(summary2, FMClassificationSummary)
self.assertFalse(isinstance(summary2, FMClassificationTrainingSummary))
self.assertEqual(summary2.labels, [0.0, 1.0])
self.assertEqual(summary2.accuracy, 0.25)
self.assertEqual(summary2.areaUnderROC, 0.5)
self.assertEqual(summary2.predictions.columns, expected_cols)
# Model save & load
with tempfile.TemporaryDirectory(prefix="factorization_machine") as d:
fm.write().overwrite().save(d)
fm2 = FMClassifier.load(d)
self.assertEqual(str(fm), str(fm2))
model.write().overwrite().save(d)
model2 = FMClassificationModel.load(d)
self.assertEqual(str(model), str(model2))
def test_decision_tree_classifier(self):
df = (
self.spark.createDataFrame(
[
(1.0, 1.0, Vectors.dense(0.0, 5.0)),
(0.0, 2.0, Vectors.dense(1.0, 2.0)),
(1.0, 3.0, Vectors.dense(2.0, 1.0)),
(0.0, 4.0, Vectors.dense(3.0, 3.0)),
],
["label", "weight", "features"],
)
.coalesce(1)
.sortWithinPartitions("weight")
)
dt = DecisionTreeClassifier(
maxDepth=2,
labelCol="label",
leafCol="leaf",
seed=1,
)
self.assertEqual(dt.getMaxDepth(), 2)
self.assertEqual(dt.getSeed(), 1)
self.assertEqual(dt.getLabelCol(), "label")
self.assertEqual(dt.getLeafCol(), "leaf")
model = dt.fit(df)
self.assertEqual(dt.uid, model.uid)
self.assertEqual(model.numClasses, 2)
self.assertEqual(model.numFeatures, 2)
self.assertEqual(model.depth, 2)
self.assertEqual(model.numNodes, 5)
self.assertEqual(model.featureImportances, Vectors.sparse(2, [0], [1.0]))
self.assertTrue("depth=2, numNodes=5, numClasses=2, numFeatures=2" in model.toDebugString)
self.assertTrue("If (feature 0 <= 2.5)" in model.toDebugString)
vec = Vectors.dense(0.0, 5.0)
self.assertEqual(model.predict(vec), 1.0)
self.assertEqual(model.predictRaw(vec), Vectors.dense(0.0, 1.0))
self.assertEqual(model.predictProbability(vec), Vectors.dense(0.0, 1.0))
self.assertEqual(model.predictLeaf(vec), 0.0)
output = model.transform(df)
expected_cols = [
"label",
"weight",
"features",
"rawPrediction",
"probability",
"prediction",
"leaf",
]
self.assertEqual(output.columns, expected_cols)
self.assertEqual(output.count(), 4)
# save & load
with tempfile.TemporaryDirectory(prefix="decision_tree") as d:
dt.write().overwrite().save(d)
dt2 = DecisionTreeClassifier.load(d)
self.assertEqual(str(dt), str(dt2))
model.write().overwrite().save(d)
model2 = DecisionTreeClassificationModel.load(d)
self.assertEqual(str(model), str(model2))
self.assertEqual(model.toDebugString, model2.toDebugString)
def test_gbt_classifier(self):
df = (
self.spark.createDataFrame(
[
(1.0, 1.0, Vectors.dense(0.0, 5.0)),
(0.0, 2.0, Vectors.dense(1.0, 2.0)),
(1.0, 3.0, Vectors.dense(2.0, 1.0)),
(0.0, 4.0, Vectors.dense(3.0, 3.0)),
],
["label", "weight", "features"],
)
.coalesce(1)
.sortWithinPartitions("weight")
)
gbt = GBTClassifier(
maxIter=3,
maxDepth=2,
labelCol="label",
leafCol="leaf",
seed=1,
)
self.assertEqual(gbt.getMaxIter(), 3)
self.assertEqual(gbt.getMaxDepth(), 2)
self.assertEqual(gbt.getSeed(), 1)
self.assertEqual(gbt.getLabelCol(), "label")
self.assertEqual(gbt.getLeafCol(), "leaf")
model = gbt.fit(df)
self.assertEqual(gbt.uid, model.uid)
self.assertEqual(model.numClasses, 2)
self.assertEqual(model.numFeatures, 2)
# TODO(SPARK-50843): Support access submodel in TreeEnsembleModel
# model.trees
self.assertEqual(model.treeWeights, [1.0, 0.1, 0.1])
self.assertEqual(model.totalNumNodes, 15)
self.assertTrue(np.allclose(model.featureImportances, [0.3333, 0.6667], atol=1e-4))
self.assertTrue("numTrees=3, numClasses=2, numFeatures=2" in model.toDebugString)
self.assertTrue("If (feature 0 <= 0.5)" in model.toDebugString)
vec = Vectors.dense(0.0, 5.0)
self.assertEqual(model.predict(vec), 1.0)
self.assertTrue(np.allclose(model.predictRaw(vec), [-1.0915, 1.0915], atol=1e-4))
self.assertTrue(np.allclose(model.predictProbability(vec), [0.1013, 0.8987], atol=1e-4))
self.assertEqual(model.predictLeaf(vec), Vectors.dense(0.0, 0.0, 0.0))
# GBT-specific method: evaluateEachIteration
self.assertTrue(
np.allclose(
model.evaluateEachIteration(df),
[0.253856022085945, 0.23205304779013333, 0.21358401299568353],
atol=1e-4,
)
)
output = model.transform(df)
expected_cols = [
"label",
"weight",
"features",
"rawPrediction",
"probability",
"prediction",
"leaf",
]
self.assertEqual(output.columns, expected_cols)
self.assertEqual(output.count(), 4)
trees = model.trees
self.assertEqual(len(trees), 3)
for tree in trees:
self.assertIsInstance(tree, DecisionTreeRegressionModel)
self.assertTrue(tree.predict(vec) > -10)
self.assertEqual(tree.transform(df).count(), 4)
self.assertEqual(
tree.transform(df).columns,
["label", "weight", "features", "prediction", "leaf"],
)
# save & load
with tempfile.TemporaryDirectory(prefix="gbt_classification") as d:
gbt.write().overwrite().save(d)
gbt2 = GBTClassifier.load(d)
self.assertEqual(str(gbt), str(gbt2))
model.write().overwrite().save(d)
model2 = GBTClassificationModel.load(d)
self.assertEqual(str(model), str(model2))
self.assertEqual(model.toDebugString, model2.toDebugString)
def test_binary_random_forest_classifier(self):
df = (
self.spark.createDataFrame(
[
(1.0, 1.0, Vectors.dense(0.0, 5.0)),
(0.0, 2.0, Vectors.dense(1.0, 2.0)),
(1.0, 3.0, Vectors.dense(2.0, 1.0)),
(0.0, 4.0, Vectors.dense(3.0, 3.0)),
],
["label", "weight", "features"],
)
.coalesce(1)
.sortWithinPartitions("weight")
)
rf = RandomForestClassifier(
numTrees=3,
maxDepth=2,
labelCol="label",
leafCol="leaf",
seed=1,
)
self.assertEqual(rf.getNumTrees(), 3)
self.assertEqual(rf.getMaxDepth(), 2)
self.assertEqual(rf.getSeed(), 1)
self.assertEqual(rf.getLabelCol(), "label")
self.assertEqual(rf.getLeafCol(), "leaf")
model = rf.fit(df)
self.assertEqual(rf.uid, model.uid)
self.assertEqual(model.numClasses, 2)
self.assertEqual(model.numFeatures, 2)
self.assertEqual(model.treeWeights, [1.0, 1.0, 1.0])
self.assertEqual(model.totalNumNodes, 9)
self.assertTrue(np.allclose(model.featureImportances, [0.7778, 0.2222], atol=1e-4))
self.assertTrue("numTrees=3, numClasses=2, numFeatures=2" in model.toDebugString)
self.assertTrue("If (feature 0 <= 0.5)" in model.toDebugString)
vec = Vectors.dense(0.0, 5.0)
self.assertEqual(model.predict(vec), 1.0)
self.assertEqual(model.predictRaw(vec), Vectors.dense(1.0, 2.0))
self.assertTrue(np.allclose(model.predictProbability(vec), [0.3333, 0.6667], atol=1e-4))
self.assertEqual(model.predictLeaf(vec), Vectors.dense(0.0, 0.0, 1.0))
output = model.transform(df)
expected_cols = [
"label",
"weight",
"features",
"rawPrediction",
"probability",
"prediction",
"leaf",
]
self.assertEqual(output.columns, expected_cols)
self.assertEqual(output.count(), 4)
trees = model.trees
self.assertEqual(len(trees), 3)
for tree in trees:
self.assertIsInstance(tree, DecisionTreeClassificationModel)
self.assertTrue(tree.predict(vec) > -10)
self.assertEqual(tree.transform(df).count(), 4)
self.assertEqual(tree.transform(df).columns, expected_cols)
# model summary
summary = model.summary
self.assertTrue(isinstance(summary, BinaryRandomForestClassificationSummary))
self.assertTrue(isinstance(summary, BinaryRandomForestClassificationTrainingSummary))
self.assertEqual(summary.labels, [0.0, 1.0])
self.assertEqual(summary.accuracy, 0.75)
self.assertEqual(summary.areaUnderROC, 0.875)
self.assertEqual(summary.predictions.columns, expected_cols)
summary2 = model.evaluate(df)
self.assertTrue(isinstance(summary2, BinaryRandomForestClassificationSummary))
self.assertFalse(isinstance(summary2, BinaryRandomForestClassificationTrainingSummary))
self.assertEqual(summary2.labels, [0.0, 1.0])
self.assertEqual(summary2.accuracy, 0.75)
self.assertEqual(summary.areaUnderROC, 0.875)
self.assertEqual(summary2.predictions.columns, expected_cols)
# Model save & load
with tempfile.TemporaryDirectory(prefix="binary_random_forest") as d:
rf.write().overwrite().save(d)
rf2 = RandomForestClassifier.load(d)
self.assertEqual(str(rf), str(rf2))
model.write().overwrite().save(d)
model2 = RandomForestClassificationModel.load(d)
self.assertEqual(str(model), str(model2))
self.assertEqual(model.toDebugString, model2.toDebugString)
def test_multiclass_random_forest_classifier(self):
df = (
self.spark.createDataFrame(
[
(1.0, 1.0, Vectors.dense(0.0, 5.0)),
(0.0, 2.0, Vectors.dense(1.0, 2.0)),
(1.0, 3.0, Vectors.dense(2.0, 1.0)),
(2.0, 4.0, Vectors.dense(3.0, 3.0)),
],
["label", "weight", "features"],
)
.coalesce(1)
.sortWithinPartitions("weight")
)
rf = RandomForestClassifier(
numTrees=3,
maxDepth=2,
labelCol="label",
leafCol="leaf",
seed=1,
)
self.assertEqual(rf.getNumTrees(), 3)
self.assertEqual(rf.getMaxDepth(), 2)
self.assertEqual(rf.getSeed(), 1)
self.assertEqual(rf.getLabelCol(), "label")
self.assertEqual(rf.getLeafCol(), "leaf")
model = rf.fit(df)
self.assertEqual(rf.uid, model.uid)
self.assertEqual(model.numClasses, 3)
self.assertEqual(model.numFeatures, 2)
self.assertEqual(len(model.trees), 3)
self.assertEqual(model.treeWeights, [1.0, 1.0, 1.0])
self.assertEqual(model.totalNumNodes, 9)
self.assertEqual(model.featureImportances, Vectors.sparse(2, [0], [1.0]))
self.assertTrue("numTrees=3, numClasses=3, numFeatures=2" in model.toDebugString)
self.assertTrue("If (feature 0 <= 1.5)" in model.toDebugString)
vec = Vectors.dense(0.0, 5.0)
self.assertEqual(model.predict(vec), 0.0)
self.assertEqual(model.predictRaw(vec), Vectors.dense(1.5, 1.5, 0.0))
self.assertTrue(np.allclose(model.predictProbability(vec), [0.5, 0.5, 0.0], atol=1e-4))
self.assertEqual(model.predictLeaf(vec), Vectors.dense(0.0, 0.0, 0.0))
output = model.transform(df)
expected_cols = [
"label",
"weight",
"features",
"rawPrediction",
"probability",
"prediction",
"leaf",
]
self.assertEqual(output.columns, expected_cols)
self.assertEqual(output.count(), 4)
# model summary
summary = model.summary
self.assertTrue(isinstance(summary, RandomForestClassificationSummary))
self.assertTrue(isinstance(summary, RandomForestClassificationTrainingSummary))
self.assertEqual(summary.labels, [0.0, 1.0, 2.0])
self.assertEqual(summary.accuracy, 0.5)
self.assertEqual(summary.predictions.columns, expected_cols)
summary2 = model.evaluate(df)
self.assertTrue(isinstance(summary2, RandomForestClassificationSummary))
self.assertFalse(isinstance(summary2, RandomForestClassificationTrainingSummary))
self.assertEqual(summary2.labels, [0.0, 1.0, 2.0])
self.assertEqual(summary2.accuracy, 0.5)
self.assertEqual(summary2.predictions.columns, expected_cols)
# save & load
with tempfile.TemporaryDirectory(prefix="multiclass_random_forest") as d:
rf.write().overwrite().save(d)
rf2 = RandomForestClassifier.load(d)
self.assertEqual(str(rf), str(rf2))
model.write().overwrite().save(d)
model2 = RandomForestClassificationModel.load(d)
self.assertEqual(str(model), str(model2))
self.assertEqual(model.toDebugString, model2.toDebugString)
def test_mlp(self):
df = (
self.spark.createDataFrame(
[
(1.0, 1.0, Vectors.dense(0.0, 5.0)),
(0.0, 2.0, Vectors.dense(1.0, 2.0)),
(1.0, 3.0, Vectors.dense(2.0, 1.0)),
(0.0, 4.0, Vectors.dense(3.0, 3.0)),
],
["label", "weight", "features"],
)
.coalesce(1)
.sortWithinPartitions("weight")
)
mlp = MultilayerPerceptronClassifier(
layers=[2, 2],
maxIter=1,
seed=1,
)
self.assertEqual(mlp.getLayers(), [2, 2])
self.assertEqual(mlp.getMaxIter(), 1)
self.assertEqual(mlp.getSeed(), 1)
model = mlp.fit(df)
self.assertEqual(mlp.uid, model.uid)
self.assertEqual(model.numClasses, 2)
self.assertEqual(model.numFeatures, 2)
self.assertTrue(
np.allclose(
model.weights.toArray(),
[
0.43562736294302623,
0.364580202422002,
-1.4112729385978997,
-1.2643591053546168,
1.1512595235805883,
0.7857317704872436,
],
atol=1e-4,
),
model.weights,
)
vec = Vectors.dense(0.0, 5.0)
pred = model.predict(vec)
self.assertEqual(pred, 1.0)
pred = model.predictRaw(vec)
self.assertTrue(
np.allclose(pred.toArray(), [-5.905105169408911, -5.53606375628584], atol=1e-4), pred
)
pred = model.predictProbability(vec)
self.assertTrue(
np.allclose(pred.toArray(), [0.4087726702431394, 0.5912273297568605], atol=1e-4), pred
)
output = model.transform(df)
expected_cols = [
"label",
"weight",
"features",
"rawPrediction",
"probability",
"prediction",
]
self.assertEqual(output.columns, expected_cols)
self.assertEqual(output.count(), 4)
# model summary
self.assertTrue(model.hasSummary)
summary = model.summary()
self.assertIsInstance(summary, MultilayerPerceptronClassificationSummary)
self.assertIsInstance(summary, MultilayerPerceptronClassificationTrainingSummary)
self.assertEqual(summary.labels, [0.0, 1.0])
self.assertEqual(summary.accuracy, 0.75)
self.assertEqual(summary.predictions.columns, expected_cols)
summary2 = model.evaluate(df)
self.assertIsInstance(summary2, MultilayerPerceptronClassificationSummary)
self.assertNotIsInstance(summary2, MultilayerPerceptronClassificationTrainingSummary)
self.assertEqual(summary2.labels, [0.0, 1.0])
self.assertEqual(summary2.accuracy, 0.75)
self.assertEqual(summary2.predictions.columns, expected_cols)
# Model save & load
with tempfile.TemporaryDirectory(prefix="mlpc") as d:
mlp.write().overwrite().save(d)
mlp2 = MultilayerPerceptronClassifier.load(d)
self.assertEqual(str(mlp), str(mlp2))
model.write().overwrite().save(d)
model2 = MultilayerPerceptronClassificationModel.load(d)
self.assertEqual(str(model), str(model2))
class ClassificationTests(ClassificationTestsMixin, ReusedSQLTestCase):
pass
if __name__ == "__main__":
from pyspark.ml.tests.test_classification import * # noqa: F401,F403
try:
import xmlrunner # type: ignore[import]
testRunner = xmlrunner.XMLTestRunner(output="target/test-reports", verbosity=2)
except ImportError:
testRunner = None
unittest.main(testRunner=testRunner, verbosity=2)