blob: bcedd65b05b30d341a3e78bfb2e20fa702912c24 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
import tempfile
from shutil import rmtree
import unittest
from numpy import array, array_equal
from py4j.protocol import Py4JJavaError
from pyspark.mllib.fpm import FPGrowth
from pyspark.mllib.recommendation import Rating
from pyspark.mllib.regression import LabeledPoint
from pyspark.serializers import CPickleSerializer
from pyspark.testing.mllibutils import MLlibTestCase
from pyspark.testing.utils import eventually
class ListTests(MLlibTestCase):
"""
Test MLlib algorithms on plain lists, to make sure they're passed through
as NumPy arrays.
"""
def test_bisecting_kmeans(self):
from pyspark.mllib.clustering import BisectingKMeans
data = array([0.0, 0.0, 1.0, 1.0, 9.0, 8.0, 8.0, 9.0]).reshape(4, 2)
bskm = BisectingKMeans()
model = bskm.train(self.sc.parallelize(data, 2), k=4)
p = array([0.0, 0.0])
rdd_p = self.sc.parallelize([p])
self.assertEqual(model.predict(p), model.predict(rdd_p).first())
self.assertEqual(model.computeCost(p), model.computeCost(rdd_p))
self.assertEqual(model.k, len(model.clusterCenters))
def test_kmeans(self):
from pyspark.mllib.clustering import KMeans
data = [
[0, 1.1],
[0, 1.2],
[1.1, 0],
[1.2, 0],
]
clusters = KMeans.train(
self.sc.parallelize(data),
2,
initializationMode="k-means||",
initializationSteps=7,
epsilon=1e-4,
)
self.assertEqual(clusters.predict(data[0]), clusters.predict(data[1]))
self.assertEqual(clusters.predict(data[2]), clusters.predict(data[3]))
def test_kmeans_deterministic(self):
from pyspark.mllib.clustering import KMeans
X = range(0, 100, 10)
Y = range(0, 100, 10)
data = [[x, y] for x, y in zip(X, Y)]
clusters1 = KMeans.train(
self.sc.parallelize(data),
3,
initializationMode="k-means||",
seed=42,
initializationSteps=7,
epsilon=1e-4,
)
clusters2 = KMeans.train(
self.sc.parallelize(data),
3,
initializationMode="k-means||",
seed=42,
initializationSteps=7,
epsilon=1e-4,
)
centers1 = clusters1.centers
centers2 = clusters2.centers
for c1, c2 in zip(centers1, centers2):
# TODO: Allow small numeric difference.
self.assertTrue(array_equal(c1, c2))
@eventually(timeout=60, catch_assertions=True)
def test_gmm(self):
from pyspark.mllib.clustering import GaussianMixture
data = self.sc.parallelize(
[
[1, 2],
[8, 9],
[-4, -3],
[-6, -7],
]
)
clusters = GaussianMixture.train(
data,
2,
convergenceTol=0.001,
maxIterations=10,
seed=1,
)
labels = clusters.predict(data).collect()
self.assertEqual(labels[0], labels[1])
self.assertEqual(labels[2], labels[3])
def test_gmm_deterministic(self):
from pyspark.mllib.clustering import GaussianMixture
x = range(0, 100, 10)
y = range(0, 100, 10)
data = self.sc.parallelize([[a, b] for a, b in zip(x, y)])
clusters1 = GaussianMixture.train(data, 5, convergenceTol=0.001, maxIterations=10, seed=63)
clusters2 = GaussianMixture.train(data, 5, convergenceTol=0.001, maxIterations=10, seed=63)
for c1, c2 in zip(clusters1.weights, clusters2.weights):
self.assertEqual(round(c1, 7), round(c2, 7))
def test_gmm_with_initial_model(self):
from pyspark.mllib.clustering import GaussianMixture
data = self.sc.parallelize([(-10, -5), (-9, -4), (10, 5), (9, 4)])
gmm1 = GaussianMixture.train(data, 2, convergenceTol=0.001, maxIterations=10, seed=63)
gmm2 = GaussianMixture.train(
data, 2, convergenceTol=0.001, maxIterations=10, seed=63, initialModel=gmm1
)
self.assertAlmostEqual((gmm1.weights - gmm2.weights).sum(), 0.0)
def test_classification(self):
from pyspark.mllib.classification import LogisticRegressionWithSGD, SVMWithSGD, NaiveBayes
from pyspark.mllib.tree import (
DecisionTree,
DecisionTreeModel,
RandomForest,
RandomForestModel,
GradientBoostedTrees,
GradientBoostedTreesModel,
)
data = [
LabeledPoint(0.0, [1, 0, 0]),
LabeledPoint(1.0, [0, 1, 1]),
LabeledPoint(0.0, [2, 0, 0]),
LabeledPoint(1.0, [0, 2, 1]),
]
rdd = self.sc.parallelize(data)
features = [p.features.tolist() for p in data]
temp_dir = tempfile.mkdtemp()
lr_model = LogisticRegressionWithSGD.train(rdd, iterations=10)
self.assertTrue(lr_model.predict(features[0]) <= 0)
self.assertTrue(lr_model.predict(features[1]) > 0)
self.assertTrue(lr_model.predict(features[2]) <= 0)
self.assertTrue(lr_model.predict(features[3]) > 0)
svm_model = SVMWithSGD.train(rdd, iterations=10)
self.assertTrue(svm_model.predict(features[0]) <= 0)
self.assertTrue(svm_model.predict(features[1]) > 0)
self.assertTrue(svm_model.predict(features[2]) <= 0)
self.assertTrue(svm_model.predict(features[3]) > 0)
nb_model = NaiveBayes.train(rdd)
self.assertTrue(nb_model.predict(features[0]) <= 0)
self.assertTrue(nb_model.predict(features[1]) > 0)
self.assertTrue(nb_model.predict(features[2]) <= 0)
self.assertTrue(nb_model.predict(features[3]) > 0)
categoricalFeaturesInfo = {0: 3} # feature 0 has 3 categories
dt_model = DecisionTree.trainClassifier(
rdd, numClasses=2, categoricalFeaturesInfo=categoricalFeaturesInfo, maxBins=4
)
self.assertTrue(dt_model.predict(features[0]) <= 0)
self.assertTrue(dt_model.predict(features[1]) > 0)
self.assertTrue(dt_model.predict(features[2]) <= 0)
self.assertTrue(dt_model.predict(features[3]) > 0)
dt_model_dir = os.path.join(temp_dir, "dt")
dt_model.save(self.sc, dt_model_dir)
same_dt_model = DecisionTreeModel.load(self.sc, dt_model_dir)
self.assertEqual(same_dt_model.toDebugString(), dt_model.toDebugString())
rf_model = RandomForest.trainClassifier(
rdd,
numClasses=2,
categoricalFeaturesInfo=categoricalFeaturesInfo,
numTrees=10,
maxBins=4,
seed=1,
)
self.assertTrue(rf_model.predict(features[0]) <= 0)
self.assertTrue(rf_model.predict(features[1]) > 0)
self.assertTrue(rf_model.predict(features[2]) <= 0)
self.assertTrue(rf_model.predict(features[3]) > 0)
rf_model_dir = os.path.join(temp_dir, "rf")
rf_model.save(self.sc, rf_model_dir)
same_rf_model = RandomForestModel.load(self.sc, rf_model_dir)
self.assertEqual(same_rf_model.toDebugString(), rf_model.toDebugString())
gbt_model = GradientBoostedTrees.trainClassifier(
rdd, categoricalFeaturesInfo=categoricalFeaturesInfo, numIterations=4
)
self.assertTrue(gbt_model.predict(features[0]) <= 0)
self.assertTrue(gbt_model.predict(features[1]) > 0)
self.assertTrue(gbt_model.predict(features[2]) <= 0)
self.assertTrue(gbt_model.predict(features[3]) > 0)
gbt_model_dir = os.path.join(temp_dir, "gbt")
gbt_model.save(self.sc, gbt_model_dir)
same_gbt_model = GradientBoostedTreesModel.load(self.sc, gbt_model_dir)
self.assertEqual(same_gbt_model.toDebugString(), gbt_model.toDebugString())
try:
rmtree(temp_dir)
except OSError:
pass
def test_regression(self):
from pyspark.mllib.regression import (
LinearRegressionWithSGD,
LassoWithSGD,
RidgeRegressionWithSGD,
)
from pyspark.mllib.tree import DecisionTree, RandomForest, GradientBoostedTrees
data = [
LabeledPoint(-1.0, [0, -1]),
LabeledPoint(1.0, [0, 1]),
LabeledPoint(-1.0, [0, -2]),
LabeledPoint(1.0, [0, 2]),
]
rdd = self.sc.parallelize(data)
features = [p.features.tolist() for p in data]
lr_model = LinearRegressionWithSGD.train(rdd, iterations=10)
self.assertTrue(lr_model.predict(features[0]) <= 0)
self.assertTrue(lr_model.predict(features[1]) > 0)
self.assertTrue(lr_model.predict(features[2]) <= 0)
self.assertTrue(lr_model.predict(features[3]) > 0)
lasso_model = LassoWithSGD.train(rdd, iterations=10)
self.assertTrue(lasso_model.predict(features[0]) <= 0)
self.assertTrue(lasso_model.predict(features[1]) > 0)
self.assertTrue(lasso_model.predict(features[2]) <= 0)
self.assertTrue(lasso_model.predict(features[3]) > 0)
rr_model = RidgeRegressionWithSGD.train(rdd, iterations=10)
self.assertTrue(rr_model.predict(features[0]) <= 0)
self.assertTrue(rr_model.predict(features[1]) > 0)
self.assertTrue(rr_model.predict(features[2]) <= 0)
self.assertTrue(rr_model.predict(features[3]) > 0)
categoricalFeaturesInfo = {0: 2} # feature 0 has 2 categories
dt_model = DecisionTree.trainRegressor(
rdd, categoricalFeaturesInfo=categoricalFeaturesInfo, maxBins=4
)
self.assertTrue(dt_model.predict(features[0]) <= 0)
self.assertTrue(dt_model.predict(features[1]) > 0)
self.assertTrue(dt_model.predict(features[2]) <= 0)
self.assertTrue(dt_model.predict(features[3]) > 0)
rf_model = RandomForest.trainRegressor(
rdd, categoricalFeaturesInfo=categoricalFeaturesInfo, numTrees=10, maxBins=4, seed=1
)
self.assertTrue(rf_model.predict(features[0]) <= 0)
self.assertTrue(rf_model.predict(features[1]) > 0)
self.assertTrue(rf_model.predict(features[2]) <= 0)
self.assertTrue(rf_model.predict(features[3]) > 0)
gbt_model = GradientBoostedTrees.trainRegressor(
rdd, categoricalFeaturesInfo=categoricalFeaturesInfo, numIterations=4
)
self.assertTrue(gbt_model.predict(features[0]) <= 0)
self.assertTrue(gbt_model.predict(features[1]) > 0)
self.assertTrue(gbt_model.predict(features[2]) <= 0)
self.assertTrue(gbt_model.predict(features[3]) > 0)
try:
LinearRegressionWithSGD.train(rdd, initialWeights=array([1.0, 1.0]), iterations=10)
LassoWithSGD.train(rdd, initialWeights=array([1.0, 1.0]), iterations=10)
RidgeRegressionWithSGD.train(rdd, initialWeights=array([1.0, 1.0]), iterations=10)
except ValueError:
self.fail()
# Verify that maxBins is being passed through
GradientBoostedTrees.trainRegressor(
rdd, categoricalFeaturesInfo=categoricalFeaturesInfo, numIterations=4, maxBins=32
)
with self.assertRaises(Exception):
GradientBoostedTrees.trainRegressor(
rdd, categoricalFeaturesInfo=categoricalFeaturesInfo, numIterations=4, maxBins=1
)
class ALSTests(MLlibTestCase):
def test_als_ratings_serialize(self):
ser = CPickleSerializer()
r = Rating(7, 1123, 3.14)
jr = self.sc._jvm.org.apache.spark.mllib.api.python.SerDe.loads(bytearray(ser.dumps(r)))
nr = ser.loads(bytes(self.sc._jvm.org.apache.spark.mllib.api.python.SerDe.dumps(jr)))
self.assertEqual(r.user, nr.user)
self.assertEqual(r.product, nr.product)
self.assertAlmostEqual(r.rating, nr.rating, 2)
def test_als_ratings_id_long_error(self):
ser = CPickleSerializer()
r = Rating(1205640308657491975, 50233468418, 1.0)
# rating user id exceeds max int value, should fail when pickled
self.assertRaises(
Py4JJavaError,
self.sc._jvm.org.apache.spark.mllib.api.python.SerDe.loads,
bytearray(ser.dumps(r)),
)
class FPGrowthTest(MLlibTestCase):
def test_fpgrowth(self):
data = [["a", "b", "c"], ["a", "b", "d", "e"], ["a", "c", "e"], ["a", "c", "f"]]
rdd = self.sc.parallelize(data, 2)
model1 = FPGrowth.train(rdd, 0.6, 2)
# use default data partition number when numPartitions is not specified
model2 = FPGrowth.train(rdd, 0.6)
self.assertEqual(
sorted(model1.freqItemsets().collect()), sorted(model2.freqItemsets().collect())
)
if __name__ == "__main__":
from pyspark.mllib.tests.test_algorithms import * # noqa: F401
try:
import xmlrunner
testRunner = xmlrunner.XMLTestRunner(output="target/test-reports", verbosity=2)
except ImportError:
testRunner = None
unittest.main(testRunner=testRunner, verbosity=2)