blob: 52688fdd63cf2781b779ada4a3d1b4bfca7f2858 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import tempfile
import unittest
import numpy as np
from pyspark.ml.linalg import Vectors
from pyspark.ml.regression import (
AFTSurvivalRegression,
AFTSurvivalRegressionModel,
IsotonicRegression,
IsotonicRegressionModel,
LinearRegression,
LinearRegressionModel,
GeneralizedLinearRegression,
GeneralizedLinearRegressionModel,
GeneralizedLinearRegressionSummary,
GeneralizedLinearRegressionTrainingSummary,
LinearRegressionSummary,
LinearRegressionTrainingSummary,
FMRegressor,
FMRegressionModel,
DecisionTreeRegressor,
DecisionTreeRegressionModel,
RandomForestRegressor,
RandomForestRegressionModel,
GBTRegressor,
GBTRegressionModel,
)
from pyspark.sql import is_remote
from pyspark.testing.sqlutils import ReusedSQLTestCase
class RegressionTestsMixin:
def test_aft_survival(self):
spark = self.spark
df = spark.createDataFrame(
[(1.0, Vectors.dense(1.0), 1.0), (1e-40, Vectors.sparse(1, [], []), 0.0)],
["label", "features", "censor"],
)
aft = AFTSurvivalRegression()
aft.setMaxIter(1)
self.assertEqual(aft.getMaxIter(), 1)
model = aft.fit(df)
self.assertEqual(aft.uid, model.uid)
self.assertEqual(model.numFeatures, 1)
self.assertTrue(np.allclose(model.intercept, 0.0, atol=1e-4), model.intercept)
self.assertTrue(
np.allclose(model.coefficients.toArray(), [0.0], atol=1e-4), model.coefficients
)
self.assertTrue(np.allclose(model.scale, 1.0, atol=1e-4), model.scale)
vec = Vectors.dense(6.3)
pred = model.predict(vec)
self.assertEqual(pred, 1.0)
pred = model.predictQuantiles(vec)
self.assertTrue(
np.allclose(
pred,
[
0.010050335853501444,
0.051293294387550536,
0.1053605156578263,
0.2876820724517809,
0.6931471805599453,
1.3862943611198906,
2.302585092994046,
2.9957322735539895,
4.60517018598809,
],
atol=1e-4,
),
pred,
)
output = model.transform(df)
expected_cols = ["label", "features", "censor", "prediction"]
self.assertEqual(output.columns, expected_cols)
self.assertEqual(output.count(), 2)
# Model save & load
with tempfile.TemporaryDirectory(prefix="aft_survival") as d:
aft.write().overwrite().save(d)
aft2 = AFTSurvivalRegression.load(d)
self.assertEqual(str(aft), str(aft2))
model.write().overwrite().save(d)
model2 = AFTSurvivalRegressionModel.load(d)
self.assertEqual(str(model), str(model2))
def test_isotonic_regression(self):
spark = self.spark
df = spark.createDataFrame(
[(1.0, Vectors.dense(1.0)), (0.0, Vectors.sparse(1, [], []))], ["label", "features"]
)
ir = IsotonicRegression(
isotonic=True,
featureIndex=0,
)
self.assertTrue(ir.getIsotonic())
self.assertEqual(ir.getFeatureIndex(), 0)
model = ir.fit(df)
self.assertEqual(model.numFeatures, 1)
self.assertTrue(
np.allclose(model.boundaries.toArray(), [0.0, 1.0], atol=1e-4), model.boundaries
)
self.assertTrue(
np.allclose(model.predictions.toArray(), [0.0, 1.0], atol=1e-4), model.predictions
)
pred = model.predict(1.0)
self.assertTrue(np.allclose(pred, 1.0, atol=1e-4), pred)
output = model.transform(df)
expected_cols = ["label", "features", "prediction"]
self.assertEqual(output.columns, expected_cols)
self.assertEqual(output.count(), 2)
# Model save & load
with tempfile.TemporaryDirectory(prefix="isotonic_regression") as d:
ir.write().overwrite().save(d)
ir2 = IsotonicRegression.load(d)
self.assertEqual(str(ir), str(ir2))
model.write().overwrite().save(d)
model2 = IsotonicRegressionModel.load(d)
self.assertEqual(str(model), str(model2))
def test_linear_regression(self):
spark = self.spark
df = (
spark.createDataFrame(
[
(1.0, 1.0, Vectors.dense(0.0, 5.0)),
(0.0, 2.0, Vectors.dense(1.0, 2.0)),
(1.5, 3.0, Vectors.dense(2.0, 1.0)),
(0.7, 4.0, Vectors.dense(1.5, 3.0)),
],
["label", "weight", "features"],
)
.coalesce(1)
.sortWithinPartitions("weight")
)
lr = LinearRegression(
regParam=0.0,
maxIter=2,
solver="normal",
weightCol="weight",
)
self.assertEqual(lr.getRegParam(), 0)
self.assertEqual(lr.getMaxIter(), 2)
self.assertEqual(lr.getSolver(), "normal")
self.assertEqual(lr.getWeightCol(), "weight")
model = lr.fit(df)
self.assertEqual(lr.uid, model.uid)
self.assertEqual(model.numFeatures, 2)
self.assertTrue(np.allclose(model.scale, 1.0, atol=1e-4))
self.assertTrue(np.allclose(model.intercept, -0.35, atol=1e-4))
self.assertTrue(np.allclose(model.coefficients, [0.65, 0.1125], atol=1e-4))
output = model.transform(df)
expected_cols = [
"label",
"weight",
"features",
"prediction",
]
self.assertEqual(output.columns, expected_cols)
self.assertEqual(output.count(), 4)
self.assertTrue(
np.allclose(model.predict(Vectors.dense(0.0, 5.0)), 0.21249999999999963, atol=1e-4)
)
def check_summary():
# Model summary
summary = model.summary
self.assertTrue(isinstance(summary, LinearRegressionSummary))
self.assertTrue(isinstance(summary, LinearRegressionTrainingSummary))
self.assertEqual(summary.predictions.columns, expected_cols)
self.assertEqual(summary.predictions.count(), 4)
self.assertEqual(summary.residuals.columns, ["residuals"])
self.assertEqual(summary.residuals.count(), 4)
self.assertEqual(summary.degreesOfFreedom, 1)
self.assertEqual(summary.numInstances, 4)
self.assertEqual(summary.objectiveHistory, [0.0])
self.assertTrue(
np.allclose(
summary.coefficientStandardErrors,
[1.2859821149611763, 0.6248749874975031, 3.1645497310044184],
atol=1e-4,
)
)
self.assertTrue(
np.allclose(
summary.devianceResiduals, [-0.7424621202458727, 0.7875000000000003], atol=1e-4
)
)
self.assertTrue(
np.allclose(
summary.pValues,
[0.7020630236843428, 0.8866003086182783, 0.9298746994547682],
atol=1e-4,
)
)
self.assertTrue(
np.allclose(
summary.tValues,
[0.5054502643838291, 0.1800360108036021, -0.11060025272186746],
atol=1e-4,
)
)
self.assertTrue(np.allclose(summary.explainedVariance, 0.07997500000000031, atol=1e-4))
self.assertTrue(np.allclose(summary.meanAbsoluteError, 0.4200000000000002, atol=1e-4))
self.assertTrue(np.allclose(summary.meanSquaredError, 0.20212500000000005, atol=1e-4))
self.assertTrue(
np.allclose(summary.rootMeanSquaredError, 0.44958314025327956, atol=1e-4)
)
self.assertTrue(np.allclose(summary.r2, 0.4427212572373862, atol=1e-4))
self.assertTrue(np.allclose(summary.r2adj, -0.6718362282878414, atol=1e-4))
check_summary()
if is_remote():
self.spark.client._delete_ml_cache([model._java_obj._ref_id], evict_only=True)
check_summary()
summary2 = model.evaluate(df)
self.assertTrue(isinstance(summary2, LinearRegressionSummary))
self.assertFalse(isinstance(summary2, LinearRegressionTrainingSummary))
self.assertEqual(summary2.predictions.columns, expected_cols)
self.assertEqual(summary2.predictions.count(), 4)
self.assertEqual(summary2.residuals.columns, ["residuals"])
self.assertEqual(summary2.residuals.count(), 4)
self.assertEqual(summary2.degreesOfFreedom, 1)
self.assertEqual(summary2.numInstances, 4)
self.assertTrue(
np.allclose(
summary2.devianceResiduals, [-0.7424621202458727, 0.7875000000000003], atol=1e-4
)
)
self.assertTrue(np.allclose(summary2.explainedVariance, 0.07997500000000031, atol=1e-4))
self.assertTrue(np.allclose(summary2.meanAbsoluteError, 0.4200000000000002, atol=1e-4))
self.assertTrue(np.allclose(summary2.meanSquaredError, 0.20212500000000005, atol=1e-4))
self.assertTrue(np.allclose(summary2.rootMeanSquaredError, 0.44958314025327956, atol=1e-4))
self.assertTrue(np.allclose(summary2.r2, 0.4427212572373862, atol=1e-4))
self.assertTrue(np.allclose(summary2.r2adj, -0.6718362282878414, atol=1e-4))
# Model save & load
with tempfile.TemporaryDirectory(prefix="linear_regression") as d:
lr.write().overwrite().save(d)
lr2 = LinearRegression.load(d)
self.assertEqual(str(lr), str(lr2))
model.write().overwrite().save(d)
model2 = LinearRegressionModel.load(d)
self.assertEqual(str(model), str(model2))
def test_generalized_linear_regression(self):
spark = self.spark
df = (
spark.createDataFrame(
[
(1, 1.0, Vectors.dense(0.0, 0.0)),
(2, 1.0, Vectors.dense(1.0, 2.0)),
(3, 2.0, Vectors.dense(0.0, 0.0)),
(4, 2.0, Vectors.dense(1.0, 1.0)),
],
["index", "label", "features"],
)
.coalesce(1)
.sortWithinPartitions("index")
.select("label", "features")
)
glr = GeneralizedLinearRegression(
family="gaussian",
link="identity",
linkPredictionCol="p",
)
glr.setRegParam(0.1)
glr.setMaxIter(1)
self.assertEqual(glr.getFamily(), "gaussian")
self.assertEqual(glr.getLink(), "identity")
self.assertEqual(glr.getLinkPredictionCol(), "p")
self.assertEqual(glr.getRegParam(), 0.1)
self.assertEqual(glr.getMaxIter(), 1)
model = glr.fit(df)
self.assertTrue(np.allclose(model.intercept, 1.543859649122807, atol=1e-4), model.intercept)
self.assertTrue(
np.allclose(model.coefficients.toArray(), [0.43859649, -0.35087719], atol=1e-4),
model.coefficients,
)
self.assertEqual(model.numFeatures, 2)
vec = Vectors.dense(1.0, 2.0)
pred = model.predict(vec)
self.assertTrue(np.allclose(pred, 1.280701754385965, atol=1e-4), pred)
expected_cols = ["label", "features", "p", "prediction"]
output = model.transform(df)
self.assertEqual(output.columns, expected_cols)
self.assertEqual(output.count(), 4)
def check_summary():
# Model summary
self.assertTrue(model.hasSummary)
summary = model.summary
self.assertIsInstance(summary, GeneralizedLinearRegressionSummary)
self.assertIsInstance(summary, GeneralizedLinearRegressionTrainingSummary)
self.assertEqual(summary.numIterations, 1)
self.assertEqual(summary.numInstances, 4)
self.assertEqual(summary.rank, 3)
self.assertTrue(
np.allclose(
summary.tValues,
[0.3725037662281711, -0.49418209022924164, 2.6589353685797654],
atol=1e-4,
),
summary.tValues,
)
self.assertTrue(
np.allclose(
summary.pValues,
[0.7729938686180984, 0.707802691825973, 0.22900885781807023],
atol=1e-4,
),
summary.pValues,
)
self.assertEqual(summary.predictions.columns, expected_cols)
self.assertEqual(summary.predictions.count(), 4)
self.assertEqual(summary.residuals().columns, ["devianceResiduals"])
self.assertEqual(summary.residuals().count(), 4)
check_summary()
if is_remote():
self.spark.client._delete_ml_cache([model._java_obj._ref_id], evict_only=True)
check_summary()
summary = model.summary
summary2 = model.evaluate(df)
self.assertIsInstance(summary2, GeneralizedLinearRegressionSummary)
self.assertNotIsInstance(summary2, GeneralizedLinearRegressionTrainingSummary)
self.assertEqual(summary2.numInstances, 4)
self.assertEqual(summary2.rank, 3)
self.assertEqual(summary.predictions.columns, expected_cols)
self.assertEqual(summary.predictions.count(), 4)
self.assertEqual(summary2.residuals().columns, ["devianceResiduals"])
self.assertEqual(summary2.residuals().count(), 4)
# Model save & load
with tempfile.TemporaryDirectory(prefix="generalized_linear_regression") as d:
glr.write().overwrite().save(d)
glr2 = GeneralizedLinearRegression.load(d)
self.assertEqual(str(glr), str(glr2))
model.write().overwrite().save(d)
model2 = GeneralizedLinearRegressionModel.load(d)
self.assertEqual(str(model), str(model2))
def test_factorization_machine(self):
spark = self.spark
df = (
spark.createDataFrame(
[
(1, 1.0, Vectors.dense(0.0, 0.0)),
(2, 1.0, Vectors.dense(1.0, 2.0)),
(3, 2.0, Vectors.dense(0.0, 0.0)),
(4, 2.0, Vectors.dense(1.0, 1.0)),
],
["index", "label", "features"],
)
.coalesce(1)
.sortWithinPartitions("index")
.select("label", "features")
)
fm = FMRegressor(factorSize=2, maxIter=1, regParam=1.0, seed=1)
self.assertEqual(fm.getFactorSize(), 2)
self.assertEqual(fm.getMaxIter(), 1)
self.assertEqual(fm.getRegParam(), 1.0)
self.assertEqual(fm.getSeed(), 1)
model = fm.fit(df)
self.assertEqual(fm.uid, model.uid)
self.assertEqual(model.numFeatures, 2)
self.assertTrue(
np.allclose(model.intercept, 0.9999999966668874, atol=1e-4), model.intercept
)
self.assertTrue(
np.allclose(
model.linear.toArray(), [0.9999999933342161, 0.9999999950008276], atol=1e-4
),
model.linear,
)
self.assertTrue(
np.allclose(
model.factors.toArray(),
[[-0.99999954, -0.9999992], [0.99999968, -0.99999918]],
atol=1e-4,
),
model.factors,
)
vec = Vectors.dense(0.0, 5.0)
pred = model.predict(vec)
self.assertTrue(np.allclose(pred, 5.999999971671025, atol=1e-4), pred)
output = model.transform(df)
expected_cols = ["label", "features", "prediction"]
self.assertEqual(output.columns, expected_cols)
self.assertEqual(output.count(), 4)
# Model save & load
with tempfile.TemporaryDirectory(prefix="factorization_machine") as d:
fm.write().overwrite().save(d)
fm2 = FMRegressor.load(d)
self.assertEqual(str(fm), str(fm2))
model.write().overwrite().save(d)
model2 = FMRegressionModel.load(d)
self.assertEqual(str(model), str(model2))
def test_decision_tree_regressor(self):
spark = self.spark
df = (
spark.createDataFrame(
[
(1.0, 1.0, Vectors.dense(0.0, 5.0)),
(0.0, 2.0, Vectors.dense(1.0, 2.0)),
(1.5, 3.0, Vectors.dense(2.0, 1.0)),
(0.7, 4.0, Vectors.dense(1.5, 3.0)),
],
["label", "weight", "features"],
)
.coalesce(1)
.sortWithinPartitions("weight")
)
dt = DecisionTreeRegressor(
maxDepth=2,
labelCol="label",
leafCol="leaf",
seed=1,
)
self.assertEqual(dt.getMaxDepth(), 2)
self.assertEqual(dt.getSeed(), 1)
self.assertEqual(dt.getLabelCol(), "label")
self.assertEqual(dt.getLeafCol(), "leaf")
model = dt.fit(df)
self.assertEqual(dt.uid, model.uid)
self.assertEqual(model.numFeatures, 2)
self.assertEqual(model.depth, 2)
self.assertEqual(model.numNodes, 5)
featureImportances = model.featureImportances
self.assertTrue(
np.allclose(featureImportances, [0.5756, 0.4244], atol=1e-4),
featureImportances,
)
debugString = model.toDebugString
self.assertTrue("depth=2, numNodes=5, numFeatures=2" in debugString, debugString)
self.assertTrue("If (feature 0 <= 1.75)" in debugString, debugString)
vec = Vectors.dense(0.0, 5.0)
self.assertTrue(np.allclose(model.predict(vec), 0.85, atol=1e-4))
self.assertEqual(model.predictLeaf(vec), 1.0)
output = model.transform(df)
expected_cols = [
"label",
"weight",
"features",
"prediction",
"leaf",
]
self.assertEqual(output.columns, expected_cols)
self.assertEqual(output.count(), 4)
# Model save & load
with tempfile.TemporaryDirectory(prefix="decision_tree_regression") as d:
dt.write().overwrite().save(d)
dt2 = DecisionTreeRegressor.load(d)
self.assertEqual(str(dt), str(dt2))
model.write().overwrite().save(d)
model2 = DecisionTreeRegressionModel.load(d)
self.assertEqual(str(model), str(model2))
self.assertEqual(model.toDebugString, model2.toDebugString)
def test_gbt_regressor(self):
spark = self.spark
df = (
spark.createDataFrame(
[
(1.0, 1.0, Vectors.dense(0.0, 5.0)),
(0.0, 2.0, Vectors.dense(1.0, 2.0)),
(1.5, 3.0, Vectors.dense(2.0, 1.0)),
(0.7, 4.0, Vectors.dense(1.5, 3.0)),
],
["label", "weight", "features"],
)
.coalesce(1)
.sortWithinPartitions("weight")
)
gbt = GBTRegressor(
maxIter=3,
maxDepth=2,
labelCol="label",
leafCol="leaf",
seed=1,
)
self.assertEqual(gbt.getMaxIter(), 3)
self.assertEqual(gbt.getMaxDepth(), 2)
self.assertEqual(gbt.getSeed(), 1)
self.assertEqual(gbt.getLabelCol(), "label")
self.assertEqual(gbt.getLeafCol(), "leaf")
model = gbt.fit(df)
self.assertEqual(gbt.uid, model.uid)
self.assertEqual(model.numFeatures, 2)
self.assertEqual(model.treeWeights, [1.0, 0.1, 0.1])
self.assertEqual(model.totalNumNodes, 15)
featureImportances = model.featureImportances
self.assertTrue(
np.allclose(featureImportances, [0.5944156994359766, 0.4055843005640234], atol=1e-4),
featureImportances,
)
debugString = model.toDebugString
self.assertTrue("numTrees=3, numFeatures=2" in debugString, debugString)
self.assertTrue("If (feature 0 <= 1.75)" in debugString, debugString)
vec = Vectors.dense(0.0, 5.0)
self.assertTrue(np.allclose(model.predict(vec), 0.904, atol=1e-4))
self.assertEqual(model.predictLeaf(vec), Vectors.dense(1.0, 0.0, 0.0))
# GBT-specific method: evaluateEachIteration
self.assertTrue(
np.allclose(
model.evaluateEachIteration(df, "squared"),
[0.011250000000000003, 0.0072, 0.0046079999999999975],
atol=1e-4,
)
)
self.assertTrue(
np.allclose(
model.evaluateEachIteration(df, "absolute"),
[0.07500000000000007, 0.06000000000000006, 0.048000000000000057],
atol=1e-4,
)
)
output = model.transform(df)
expected_cols = [
"label",
"weight",
"features",
"prediction",
"leaf",
]
self.assertEqual(output.columns, expected_cols)
self.assertEqual(output.count(), 4)
trees = model.trees
self.assertEqual(len(trees), 3)
for tree in trees:
self.assertIsInstance(tree, DecisionTreeRegressionModel)
self.assertTrue(tree.predict(vec) > -10)
self.assertEqual(tree.transform(df).count(), 4)
self.assertEqual(
tree.transform(df).columns,
["label", "weight", "features", "prediction", "leaf"],
)
# save & load
with tempfile.TemporaryDirectory(prefix="gbt_regression") as d:
gbt.write().overwrite().save(d)
gbt2 = GBTRegressor.load(d)
self.assertEqual(str(gbt), str(gbt2))
model.write().overwrite().save(d)
model2 = GBTRegressionModel.load(d)
self.assertEqual(str(model), str(model2))
self.assertEqual(model.toDebugString, model2.toDebugString)
def test_random_forest_regressor(self):
spark = self.spark
df = (
spark.createDataFrame(
[
(1.0, 1.0, Vectors.dense(0.0, 5.0)),
(0.0, 2.0, Vectors.dense(1.0, 2.0)),
(1.5, 3.0, Vectors.dense(2.0, 1.0)),
(0.7, 4.0, Vectors.dense(1.5, 3.0)),
],
["label", "weight", "features"],
)
.coalesce(1)
.sortWithinPartitions("weight")
)
rf = RandomForestRegressor(
numTrees=3,
maxDepth=2,
labelCol="label",
leafCol="leaf",
seed=1,
)
self.assertEqual(rf.getNumTrees(), 3)
self.assertEqual(rf.getMaxDepth(), 2)
self.assertEqual(rf.getSeed(), 1)
self.assertEqual(rf.getLabelCol(), "label")
self.assertEqual(rf.getLeafCol(), "leaf")
model = rf.fit(df)
self.assertEqual(rf.uid, model.uid)
self.assertEqual(model.numFeatures, 2)
self.assertEqual(model.treeWeights, [1.0, 1.0, 1.0])
self.assertEqual(model.totalNumNodes, 11)
featureImportances = model.featureImportances
self.assertTrue(
np.allclose(featureImportances, [0.5615222294986538, 0.43847777050134623], atol=1e-4),
featureImportances,
)
debugString = model.toDebugString
self.assertTrue("numTrees=3, numFeatures=2" in debugString, debugString)
self.assertTrue("If (feature 0 <= 1.75)" in debugString, debugString)
vec = Vectors.dense(0.0, 5.0)
self.assertTrue(np.allclose(model.predict(vec), 0.6166666666666667, atol=1e-4))
self.assertEqual(model.predictLeaf(vec), Vectors.dense(1.0, 0.0, 1.0))
output = model.transform(df)
expected_cols = [
"label",
"weight",
"features",
"prediction",
"leaf",
]
self.assertEqual(output.columns, expected_cols)
self.assertEqual(output.count(), 4)
trees = model.trees
self.assertEqual(len(trees), 3)
for tree in trees:
self.assertIsInstance(tree, DecisionTreeRegressionModel)
self.assertTrue(tree.predict(vec) > -10)
self.assertEqual(tree.transform(df).count(), 4)
self.assertEqual(
tree.transform(df).columns,
["label", "weight", "features", "prediction", "leaf"],
)
with tempfile.TemporaryDirectory(prefix="random_forest_regression") as d:
rf.write().overwrite().save(d)
rf2 = RandomForestRegressor.load(d)
self.assertEqual(str(rf), str(rf2))
model.write().overwrite().save(d)
model2 = RandomForestRegressionModel.load(d)
self.assertEqual(str(model), str(model2))
self.assertEqual(model.toDebugString, model2.toDebugString)
class RegressionTests(RegressionTestsMixin, ReusedSQLTestCase):
pass
if __name__ == "__main__":
from pyspark.ml.tests.test_regression import * # noqa: F401,F403
try:
import xmlrunner # type: ignore[import]
testRunner = xmlrunner.XMLTestRunner(output="target/test-reports", verbosity=2)
except ImportError:
testRunner = None
unittest.main(testRunner=testRunner, verbosity=2)