| # |
| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| import os |
| import tempfile |
| from shutil import rmtree |
| import unittest |
| |
| from numpy import array, array_equal |
| from py4j.protocol import Py4JJavaError |
| |
| from pyspark.mllib.fpm import FPGrowth |
| from pyspark.mllib.recommendation import Rating |
| from pyspark.mllib.regression import LabeledPoint |
| from pyspark.serializers import CPickleSerializer |
| from pyspark.testing.mllibutils import MLlibTestCase |
| from pyspark.testing.utils import eventually |
| |
| |
| class ListTests(MLlibTestCase): |
| |
| """ |
| Test MLlib algorithms on plain lists, to make sure they're passed through |
| as NumPy arrays. |
| """ |
| |
| def test_bisecting_kmeans(self): |
| from pyspark.mllib.clustering import BisectingKMeans |
| |
| data = array([0.0, 0.0, 1.0, 1.0, 9.0, 8.0, 8.0, 9.0]).reshape(4, 2) |
| bskm = BisectingKMeans() |
| model = bskm.train(self.sc.parallelize(data, 2), k=4) |
| p = array([0.0, 0.0]) |
| rdd_p = self.sc.parallelize([p]) |
| self.assertEqual(model.predict(p), model.predict(rdd_p).first()) |
| self.assertEqual(model.computeCost(p), model.computeCost(rdd_p)) |
| self.assertEqual(model.k, len(model.clusterCenters)) |
| |
| def test_kmeans(self): |
| from pyspark.mllib.clustering import KMeans |
| |
| data = [ |
| [0, 1.1], |
| [0, 1.2], |
| [1.1, 0], |
| [1.2, 0], |
| ] |
| clusters = KMeans.train( |
| self.sc.parallelize(data), |
| 2, |
| initializationMode="k-means||", |
| initializationSteps=7, |
| epsilon=1e-4, |
| ) |
| self.assertEqual(clusters.predict(data[0]), clusters.predict(data[1])) |
| self.assertEqual(clusters.predict(data[2]), clusters.predict(data[3])) |
| |
| def test_kmeans_deterministic(self): |
| from pyspark.mllib.clustering import KMeans |
| |
| X = range(0, 100, 10) |
| Y = range(0, 100, 10) |
| data = [[x, y] for x, y in zip(X, Y)] |
| clusters1 = KMeans.train( |
| self.sc.parallelize(data), |
| 3, |
| initializationMode="k-means||", |
| seed=42, |
| initializationSteps=7, |
| epsilon=1e-4, |
| ) |
| clusters2 = KMeans.train( |
| self.sc.parallelize(data), |
| 3, |
| initializationMode="k-means||", |
| seed=42, |
| initializationSteps=7, |
| epsilon=1e-4, |
| ) |
| centers1 = clusters1.centers |
| centers2 = clusters2.centers |
| for c1, c2 in zip(centers1, centers2): |
| # TODO: Allow small numeric difference. |
| self.assertTrue(array_equal(c1, c2)) |
| |
| @eventually(timeout=60, catch_assertions=True) |
| def test_gmm(self): |
| from pyspark.mllib.clustering import GaussianMixture |
| |
| data = self.sc.parallelize( |
| [ |
| [1, 2], |
| [8, 9], |
| [-4, -3], |
| [-6, -7], |
| ] |
| ) |
| clusters = GaussianMixture.train( |
| data, |
| 2, |
| convergenceTol=0.001, |
| maxIterations=10, |
| seed=1, |
| ) |
| labels = clusters.predict(data).collect() |
| self.assertEqual(labels[0], labels[1]) |
| self.assertEqual(labels[2], labels[3]) |
| |
| def test_gmm_deterministic(self): |
| from pyspark.mllib.clustering import GaussianMixture |
| |
| x = range(0, 100, 10) |
| y = range(0, 100, 10) |
| data = self.sc.parallelize([[a, b] for a, b in zip(x, y)]) |
| clusters1 = GaussianMixture.train(data, 5, convergenceTol=0.001, maxIterations=10, seed=63) |
| clusters2 = GaussianMixture.train(data, 5, convergenceTol=0.001, maxIterations=10, seed=63) |
| for c1, c2 in zip(clusters1.weights, clusters2.weights): |
| self.assertEqual(round(c1, 7), round(c2, 7)) |
| |
| def test_gmm_with_initial_model(self): |
| from pyspark.mllib.clustering import GaussianMixture |
| |
| data = self.sc.parallelize([(-10, -5), (-9, -4), (10, 5), (9, 4)]) |
| |
| gmm1 = GaussianMixture.train(data, 2, convergenceTol=0.001, maxIterations=10, seed=63) |
| gmm2 = GaussianMixture.train( |
| data, 2, convergenceTol=0.001, maxIterations=10, seed=63, initialModel=gmm1 |
| ) |
| self.assertAlmostEqual((gmm1.weights - gmm2.weights).sum(), 0.0) |
| |
| def test_classification(self): |
| from pyspark.mllib.classification import LogisticRegressionWithSGD, SVMWithSGD, NaiveBayes |
| from pyspark.mllib.tree import ( |
| DecisionTree, |
| DecisionTreeModel, |
| RandomForest, |
| RandomForestModel, |
| GradientBoostedTrees, |
| GradientBoostedTreesModel, |
| ) |
| |
| data = [ |
| LabeledPoint(0.0, [1, 0, 0]), |
| LabeledPoint(1.0, [0, 1, 1]), |
| LabeledPoint(0.0, [2, 0, 0]), |
| LabeledPoint(1.0, [0, 2, 1]), |
| ] |
| rdd = self.sc.parallelize(data) |
| features = [p.features.tolist() for p in data] |
| |
| temp_dir = tempfile.mkdtemp() |
| |
| lr_model = LogisticRegressionWithSGD.train(rdd, iterations=10) |
| self.assertTrue(lr_model.predict(features[0]) <= 0) |
| self.assertTrue(lr_model.predict(features[1]) > 0) |
| self.assertTrue(lr_model.predict(features[2]) <= 0) |
| self.assertTrue(lr_model.predict(features[3]) > 0) |
| |
| svm_model = SVMWithSGD.train(rdd, iterations=10) |
| self.assertTrue(svm_model.predict(features[0]) <= 0) |
| self.assertTrue(svm_model.predict(features[1]) > 0) |
| self.assertTrue(svm_model.predict(features[2]) <= 0) |
| self.assertTrue(svm_model.predict(features[3]) > 0) |
| |
| nb_model = NaiveBayes.train(rdd) |
| self.assertTrue(nb_model.predict(features[0]) <= 0) |
| self.assertTrue(nb_model.predict(features[1]) > 0) |
| self.assertTrue(nb_model.predict(features[2]) <= 0) |
| self.assertTrue(nb_model.predict(features[3]) > 0) |
| |
| categoricalFeaturesInfo = {0: 3} # feature 0 has 3 categories |
| dt_model = DecisionTree.trainClassifier( |
| rdd, numClasses=2, categoricalFeaturesInfo=categoricalFeaturesInfo, maxBins=4 |
| ) |
| self.assertTrue(dt_model.predict(features[0]) <= 0) |
| self.assertTrue(dt_model.predict(features[1]) > 0) |
| self.assertTrue(dt_model.predict(features[2]) <= 0) |
| self.assertTrue(dt_model.predict(features[3]) > 0) |
| |
| dt_model_dir = os.path.join(temp_dir, "dt") |
| dt_model.save(self.sc, dt_model_dir) |
| same_dt_model = DecisionTreeModel.load(self.sc, dt_model_dir) |
| self.assertEqual(same_dt_model.toDebugString(), dt_model.toDebugString()) |
| |
| rf_model = RandomForest.trainClassifier( |
| rdd, |
| numClasses=2, |
| categoricalFeaturesInfo=categoricalFeaturesInfo, |
| numTrees=10, |
| maxBins=4, |
| seed=1, |
| ) |
| self.assertTrue(rf_model.predict(features[0]) <= 0) |
| self.assertTrue(rf_model.predict(features[1]) > 0) |
| self.assertTrue(rf_model.predict(features[2]) <= 0) |
| self.assertTrue(rf_model.predict(features[3]) > 0) |
| |
| rf_model_dir = os.path.join(temp_dir, "rf") |
| rf_model.save(self.sc, rf_model_dir) |
| same_rf_model = RandomForestModel.load(self.sc, rf_model_dir) |
| self.assertEqual(same_rf_model.toDebugString(), rf_model.toDebugString()) |
| |
| gbt_model = GradientBoostedTrees.trainClassifier( |
| rdd, categoricalFeaturesInfo=categoricalFeaturesInfo, numIterations=4 |
| ) |
| self.assertTrue(gbt_model.predict(features[0]) <= 0) |
| self.assertTrue(gbt_model.predict(features[1]) > 0) |
| self.assertTrue(gbt_model.predict(features[2]) <= 0) |
| self.assertTrue(gbt_model.predict(features[3]) > 0) |
| |
| gbt_model_dir = os.path.join(temp_dir, "gbt") |
| gbt_model.save(self.sc, gbt_model_dir) |
| same_gbt_model = GradientBoostedTreesModel.load(self.sc, gbt_model_dir) |
| self.assertEqual(same_gbt_model.toDebugString(), gbt_model.toDebugString()) |
| |
| try: |
| rmtree(temp_dir) |
| except OSError: |
| pass |
| |
| def test_regression(self): |
| from pyspark.mllib.regression import ( |
| LinearRegressionWithSGD, |
| LassoWithSGD, |
| RidgeRegressionWithSGD, |
| ) |
| from pyspark.mllib.tree import DecisionTree, RandomForest, GradientBoostedTrees |
| |
| data = [ |
| LabeledPoint(-1.0, [0, -1]), |
| LabeledPoint(1.0, [0, 1]), |
| LabeledPoint(-1.0, [0, -2]), |
| LabeledPoint(1.0, [0, 2]), |
| ] |
| rdd = self.sc.parallelize(data) |
| features = [p.features.tolist() for p in data] |
| |
| lr_model = LinearRegressionWithSGD.train(rdd, iterations=10) |
| self.assertTrue(lr_model.predict(features[0]) <= 0) |
| self.assertTrue(lr_model.predict(features[1]) > 0) |
| self.assertTrue(lr_model.predict(features[2]) <= 0) |
| self.assertTrue(lr_model.predict(features[3]) > 0) |
| |
| lasso_model = LassoWithSGD.train(rdd, iterations=10) |
| self.assertTrue(lasso_model.predict(features[0]) <= 0) |
| self.assertTrue(lasso_model.predict(features[1]) > 0) |
| self.assertTrue(lasso_model.predict(features[2]) <= 0) |
| self.assertTrue(lasso_model.predict(features[3]) > 0) |
| |
| rr_model = RidgeRegressionWithSGD.train(rdd, iterations=10) |
| self.assertTrue(rr_model.predict(features[0]) <= 0) |
| self.assertTrue(rr_model.predict(features[1]) > 0) |
| self.assertTrue(rr_model.predict(features[2]) <= 0) |
| self.assertTrue(rr_model.predict(features[3]) > 0) |
| |
| categoricalFeaturesInfo = {0: 2} # feature 0 has 2 categories |
| dt_model = DecisionTree.trainRegressor( |
| rdd, categoricalFeaturesInfo=categoricalFeaturesInfo, maxBins=4 |
| ) |
| self.assertTrue(dt_model.predict(features[0]) <= 0) |
| self.assertTrue(dt_model.predict(features[1]) > 0) |
| self.assertTrue(dt_model.predict(features[2]) <= 0) |
| self.assertTrue(dt_model.predict(features[3]) > 0) |
| |
| rf_model = RandomForest.trainRegressor( |
| rdd, categoricalFeaturesInfo=categoricalFeaturesInfo, numTrees=10, maxBins=4, seed=1 |
| ) |
| self.assertTrue(rf_model.predict(features[0]) <= 0) |
| self.assertTrue(rf_model.predict(features[1]) > 0) |
| self.assertTrue(rf_model.predict(features[2]) <= 0) |
| self.assertTrue(rf_model.predict(features[3]) > 0) |
| |
| gbt_model = GradientBoostedTrees.trainRegressor( |
| rdd, categoricalFeaturesInfo=categoricalFeaturesInfo, numIterations=4 |
| ) |
| self.assertTrue(gbt_model.predict(features[0]) <= 0) |
| self.assertTrue(gbt_model.predict(features[1]) > 0) |
| self.assertTrue(gbt_model.predict(features[2]) <= 0) |
| self.assertTrue(gbt_model.predict(features[3]) > 0) |
| |
| try: |
| LinearRegressionWithSGD.train(rdd, initialWeights=array([1.0, 1.0]), iterations=10) |
| LassoWithSGD.train(rdd, initialWeights=array([1.0, 1.0]), iterations=10) |
| RidgeRegressionWithSGD.train(rdd, initialWeights=array([1.0, 1.0]), iterations=10) |
| except ValueError: |
| self.fail() |
| |
| # Verify that maxBins is being passed through |
| GradientBoostedTrees.trainRegressor( |
| rdd, categoricalFeaturesInfo=categoricalFeaturesInfo, numIterations=4, maxBins=32 |
| ) |
| with self.assertRaises(Exception): |
| GradientBoostedTrees.trainRegressor( |
| rdd, categoricalFeaturesInfo=categoricalFeaturesInfo, numIterations=4, maxBins=1 |
| ) |
| |
| |
| class ALSTests(MLlibTestCase): |
| def test_als_ratings_serialize(self): |
| ser = CPickleSerializer() |
| r = Rating(7, 1123, 3.14) |
| jr = self.sc._jvm.org.apache.spark.mllib.api.python.SerDe.loads(bytearray(ser.dumps(r))) |
| nr = ser.loads(bytes(self.sc._jvm.org.apache.spark.mllib.api.python.SerDe.dumps(jr))) |
| self.assertEqual(r.user, nr.user) |
| self.assertEqual(r.product, nr.product) |
| self.assertAlmostEqual(r.rating, nr.rating, 2) |
| |
| def test_als_ratings_id_long_error(self): |
| ser = CPickleSerializer() |
| r = Rating(1205640308657491975, 50233468418, 1.0) |
| # rating user id exceeds max int value, should fail when pickled |
| self.assertRaises( |
| Py4JJavaError, |
| self.sc._jvm.org.apache.spark.mllib.api.python.SerDe.loads, |
| bytearray(ser.dumps(r)), |
| ) |
| |
| |
| class FPGrowthTest(MLlibTestCase): |
| def test_fpgrowth(self): |
| data = [["a", "b", "c"], ["a", "b", "d", "e"], ["a", "c", "e"], ["a", "c", "f"]] |
| rdd = self.sc.parallelize(data, 2) |
| model1 = FPGrowth.train(rdd, 0.6, 2) |
| # use default data partition number when numPartitions is not specified |
| model2 = FPGrowth.train(rdd, 0.6) |
| self.assertEqual( |
| sorted(model1.freqItemsets().collect()), sorted(model2.freqItemsets().collect()) |
| ) |
| |
| |
| if __name__ == "__main__": |
| from pyspark.mllib.tests.test_algorithms import * # noqa: F401 |
| |
| try: |
| import xmlrunner |
| |
| testRunner = xmlrunner.XMLTestRunner(output="target/test-reports", verbosity=2) |
| except ImportError: |
| testRunner = None |
| unittest.main(testRunner=testRunner, verbosity=2) |