blob: 48283d341fabf912a0fdd86eafe677f2fe2b7fe7 [file]
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
import tempfile
import unittest
import numpy as np
from pyspark.util import is_remote_only
from pyspark.testing.connectutils import should_test_connect, connect_requirement_message
from pyspark.testing.utils import have_torch, torch_requirement_message
from pyspark.testing.sqlutils import ReusedSQLTestCase
if should_test_connect:
from pyspark.ml.connect.classification import (
LogisticRegression as LORV2,
LogisticRegressionModel as LORV2Model,
)
import pandas as pd
class ClassificationTestsMixin:
@staticmethod
def _check_result(result_dataframe, expected_predictions, expected_probabilities=None):
np.testing.assert_array_equal(list(result_dataframe.prediction), expected_predictions)
if "probability" in result_dataframe.columns:
np.testing.assert_allclose(
list(result_dataframe.probability),
expected_probabilities,
rtol=1e-1,
)
def test_binary_classes_logistic_regression(self):
df1 = self.spark.createDataFrame(
[
(1.0, [0.0, 5.0]),
(0.0, [1.0, 2.0]),
(1.0, [2.0, 1.0]),
(0.0, [3.0, 3.0]),
]
* 100,
["label", "features"],
)
eval_df1 = self.spark.createDataFrame(
[
([0.0, 2.0],),
([3.5, 3.0],),
],
["features"],
)
lorv2 = LORV2(maxIter=200, numTrainWorkers=2, learningRate=0.001)
assert lorv2.getMaxIter() == 200
assert lorv2.getNumTrainWorkers() == 2
assert lorv2.getOrDefault(lorv2.learningRate) == 0.001
model = lorv2.fit(df1)
assert model.uid == lorv2.uid
expected_predictions = [1, 0]
expected_probabilities = [
[0.217875, 0.782125],
[0.839615, 0.160385],
]
result = model.transform(eval_df1).toPandas()
self._check_result(result, expected_predictions, expected_probabilities)
pandas_eval_df1 = eval_df1.toPandas()
pandas_eval_df1_copy = pandas_eval_df1.copy()
local_transform_result = model.transform(pandas_eval_df1)
# assert that `transform` doesn't mutate the input dataframe.
pd.testing.assert_frame_equal(pandas_eval_df1, pandas_eval_df1_copy)
self._check_result(local_transform_result, expected_predictions, expected_probabilities)
model.set(model.probabilityCol, "")
result_without_prob = model.transform(eval_df1).toPandas()
assert "probability" not in result_without_prob.columns
self._check_result(result_without_prob, expected_predictions, None)
def test_multi_classes_logistic_regression(self):
df1 = self.spark.createDataFrame(
[
(1.0, [1.0, 5.0]),
(2.0, [1.0, -2.0]),
(0.0, [-2.0, 1.5]),
]
* 100,
["label", "features"],
)
eval_df1 = self.spark.createDataFrame(
[
([1.5, 5.0],),
([1.0, -2.5],),
([-2.0, 1.0],),
],
["features"],
)
lorv2 = LORV2(maxIter=200, numTrainWorkers=2, learningRate=0.001)
model = lorv2.fit(df1)
expected_predictions = [1, 2, 0]
expected_probabilities = [
[5.526459e-03, 9.943553e-01, 1.183146e-04],
[4.629959e-03, 8.141352e-03, 9.872288e-01],
[9.624363e-01, 3.080821e-02, 6.755549e-03],
]
result = model.transform(eval_df1).toPandas()
self._check_result(result, expected_predictions, expected_probabilities)
local_transform_result = model.transform(eval_df1.toPandas())
self._check_result(local_transform_result, expected_predictions, expected_probabilities)
def test_save_load(self):
import torch
with tempfile.TemporaryDirectory(prefix="test_save_load") as tmp_dir:
with torch.serialization.safe_globals(
[
torch.nn.modules.container.Sequential,
torch.nn.modules.linear.Linear,
torch.nn.modules.activation.Softmax,
]
):
estimator = LORV2(maxIter=2, numTrainWorkers=2, learningRate=0.001)
local_path = os.path.join(tmp_dir, "estimator")
estimator.saveToLocal(local_path)
loaded_estimator = LORV2.loadFromLocal(local_path)
assert loaded_estimator.uid == estimator.uid
assert loaded_estimator.getOrDefault(loaded_estimator.maxIter) == 2
assert loaded_estimator.getOrDefault(loaded_estimator.numTrainWorkers) == 2
assert loaded_estimator.getOrDefault(loaded_estimator.learningRate) == 0.001
# test overwriting
estimator2 = estimator.copy()
estimator2.set(estimator2.maxIter, 10)
estimator2.saveToLocal(local_path, overwrite=True)
loaded_estimator2 = LORV2.loadFromLocal(local_path)
assert loaded_estimator2.getOrDefault(loaded_estimator2.maxIter) == 10
fs_path = os.path.join(tmp_dir, "fs", "estimator")
estimator.save(fs_path)
loaded_estimator = LORV2.load(fs_path)
assert loaded_estimator.uid == estimator.uid
assert loaded_estimator.getOrDefault(loaded_estimator.maxIter) == 2
assert loaded_estimator.getOrDefault(loaded_estimator.numTrainWorkers) == 2
assert loaded_estimator.getOrDefault(loaded_estimator.learningRate) == 0.001
training_dataset = self.spark.createDataFrame(
[
(1.0, [0.0, 5.0]),
(0.0, [1.0, 2.0]),
(1.0, [2.0, 1.0]),
(0.0, [3.0, 3.0]),
]
* 100,
["label", "features"],
)
eval_df1 = self.spark.createDataFrame(
[
([0.0, 2.0],),
([3.5, 3.0],),
],
["features"],
)
model = estimator.fit(training_dataset)
model_predictions = model.transform(eval_df1.toPandas())
assert model.uid == estimator.uid
local_model_path = os.path.join(tmp_dir, "model")
model.saveToLocal(local_model_path)
# test saved torch model can be loaded by pytorch solely
lor_torch_model = torch.load(
os.path.join(local_model_path, "LogisticRegressionModel.torch"),
weights_only=False,
)
with torch.inference_mode():
torch_infer_result = lor_torch_model(
torch.tensor(
np.stack(list(eval_df1.toPandas().features)), dtype=torch.float32
)
).numpy()
np.testing.assert_allclose(
np.stack(list(model_predictions.probability)),
torch_infer_result,
rtol=1e-4,
)
loaded_model = LORV2Model.loadFromLocal(local_model_path)
assert loaded_model.numFeatures == 2
assert loaded_model.numClasses == 2
assert loaded_model.getOrDefault(loaded_model.maxIter) == 2
assert loaded_model.torch_model is not None
np.testing.assert_allclose(
loaded_model.torch_model.weight.detach().numpy(),
model.torch_model.weight.detach().numpy(),
)
np.testing.assert_allclose(
loaded_model.torch_model.bias.detach().numpy(),
model.torch_model.bias.detach().numpy(),
)
# Test loaded model transformation.
loaded_model.transform(eval_df1.toPandas())
fs_model_path = os.path.join(tmp_dir, "fs", "model")
model.save(fs_model_path)
loaded_model = LORV2Model.load(fs_model_path)
assert loaded_model.numFeatures == 2
assert loaded_model.numClasses == 2
assert loaded_model.getOrDefault(loaded_model.maxIter) == 2
assert loaded_model.torch_model is not None
# Test loaded model transformation works.
loaded_model.transform(eval_df1.toPandas())
@unittest.skipIf(
not should_test_connect or not have_torch or is_remote_only(),
connect_requirement_message
or torch_requirement_message
or "pyspark-connect cannot test classic Spark",
)
class ClassificationTests(ClassificationTestsMixin, ReusedSQLTestCase):
@classmethod
def master(cls):
return "local[2]"
if __name__ == "__main__":
from pyspark.testing import main
main()