blob: f9bc01dab759bb05853206fdd57194a338cec735 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import tempfile
import unittest
import numpy as np
from pyspark.ml.linalg import Vectors, SparseVector
from pyspark.ml.clustering import (
KMeans,
KMeansModel,
KMeansSummary,
BisectingKMeans,
BisectingKMeansModel,
BisectingKMeansSummary,
GaussianMixture,
GaussianMixtureModel,
GaussianMixtureSummary,
LDA,
LDAModel,
LocalLDAModel,
DistributedLDAModel,
PowerIterationClustering,
)
from pyspark.testing.sqlutils import ReusedSQLTestCase
class ClusteringTestsMixin:
def test_kmeans(self):
df = (
self.spark.createDataFrame(
[
(1, 1.0, Vectors.dense([-0.1, -0.05])),
(2, 2.0, Vectors.dense([-0.01, -0.1])),
(3, 3.0, Vectors.dense([0.9, 0.8])),
(4, 1.0, Vectors.dense([0.75, 0.935])),
(5, 1.0, Vectors.dense([-0.83, -0.68])),
(6, 1.0, Vectors.dense([-0.91, -0.76])),
],
["index", "weight", "features"],
)
.coalesce(1)
.sortWithinPartitions("index")
.select("weight", "features")
)
km = KMeans(
k=2,
maxIter=2,
weightCol="weight",
)
self.assertEqual(km.getK(), 2)
self.assertEqual(km.getMaxIter(), 2)
self.assertEqual(km.getWeightCol(), "weight")
model = km.fit(df)
self.assertEqual(km.uid, model.uid)
centers = model.clusterCenters()
self.assertEqual(len(centers), 2)
self.assertTrue(np.allclose(centers[0], [-0.372, -0.338], atol=1e-3), centers[0])
self.assertTrue(np.allclose(centers[1], [0.8625, 0.83375], atol=1e-3), centers[1])
# TODO: support KMeansModel.numFeatures in Python
# self.assertEqual(model.numFeatures, 2)
output = model.transform(df)
expected_cols = ["weight", "features", "prediction"]
self.assertEqual(output.columns, expected_cols)
self.assertEqual(output.count(), 6)
self.assertTrue(np.allclose(model.predict(Vectors.dense(0.0, 5.0)), 1, atol=1e-4))
# Model summary
self.assertTrue(model.hasSummary)
summary = model.summary
self.assertTrue(isinstance(summary, KMeansSummary))
self.assertEqual(summary.k, 2)
self.assertEqual(summary.numIter, 2)
self.assertEqual(summary.clusterSizes, [4, 2])
self.assertTrue(np.allclose(summary.trainingCost, 1.35710375, atol=1e-4))
self.assertEqual(summary.featuresCol, "features")
self.assertEqual(summary.predictionCol, "prediction")
self.assertEqual(summary.cluster.columns, ["prediction"])
self.assertEqual(summary.cluster.count(), 6)
self.assertEqual(summary.predictions.columns, expected_cols)
self.assertEqual(summary.predictions.count(), 6)
# save & load
with tempfile.TemporaryDirectory(prefix="kmeans_model") as d:
km.write().overwrite().save(d)
km2 = KMeans.load(d)
self.assertEqual(str(km), str(km2))
model.write().overwrite().save(d)
model2 = KMeansModel.load(d)
self.assertEqual(str(model), str(model2))
def test_bisecting_kmeans(self):
df = (
self.spark.createDataFrame(
[
(1, 1.0, Vectors.dense([-0.1, -0.05])),
(2, 2.0, Vectors.dense([-0.01, -0.1])),
(3, 3.0, Vectors.dense([0.9, 0.8])),
(4, 1.0, Vectors.dense([0.75, 0.935])),
(5, 1.0, Vectors.dense([-0.83, -0.68])),
(6, 1.0, Vectors.dense([-0.91, -0.76])),
],
["index", "weight", "features"],
)
.coalesce(1)
.sortWithinPartitions("index")
.select("weight", "features")
)
bkm = BisectingKMeans(
k=2,
maxIter=2,
minDivisibleClusterSize=1.0,
weightCol="weight",
)
self.assertEqual(bkm.getK(), 2)
self.assertEqual(bkm.getMaxIter(), 2)
self.assertEqual(bkm.getMinDivisibleClusterSize(), 1.0)
self.assertEqual(bkm.getWeightCol(), "weight")
model = bkm.fit(df)
self.assertEqual(bkm.uid, model.uid)
centers = model.clusterCenters()
self.assertEqual(len(centers), 2)
self.assertTrue(np.allclose(centers[0], [-0.372, -0.338], atol=1e-3), centers[0])
self.assertTrue(np.allclose(centers[1], [0.8625, 0.83375], atol=1e-3), centers[1])
# TODO: support KMeansModel.numFeatures in Python
# self.assertEqual(model.numFeatures, 2)
output = model.transform(df)
expected_cols = ["weight", "features", "prediction"]
self.assertEqual(output.columns, expected_cols)
self.assertEqual(output.count(), 6)
self.assertTrue(np.allclose(model.predict(Vectors.dense(0.0, 5.0)), 1, atol=1e-4))
# BisectingKMeans-specific method: computeCost
self.assertTrue(np.allclose(model.computeCost(df), 1.164325125, atol=1e-4))
# Model summary
self.assertTrue(model.hasSummary)
summary = model.summary
self.assertTrue(isinstance(summary, BisectingKMeansSummary))
self.assertEqual(summary.k, 2)
self.assertEqual(summary.numIter, 2)
self.assertEqual(summary.clusterSizes, [4, 2])
self.assertTrue(np.allclose(summary.trainingCost, 1.3571037499999998, atol=1e-4))
self.assertEqual(summary.featuresCol, "features")
self.assertEqual(summary.predictionCol, "prediction")
self.assertEqual(summary.cluster.columns, ["prediction"])
self.assertEqual(summary.cluster.count(), 6)
self.assertEqual(summary.predictions.columns, expected_cols)
self.assertEqual(summary.predictions.count(), 6)
# save & load
with tempfile.TemporaryDirectory(prefix="bisecting_kmeans") as d:
bkm.write().overwrite().save(d)
bkm2 = BisectingKMeans.load(d)
self.assertEqual(str(bkm), str(bkm2))
model.write().overwrite().save(d)
model2 = BisectingKMeansModel.load(d)
self.assertEqual(str(model), str(model2))
def test_gaussian_mixture(self):
spark = self.spark
df = (
spark.createDataFrame(
[
(1, 1.0, Vectors.dense([-0.1, -0.05])),
(2, 2.0, Vectors.dense([-0.01, -0.1])),
(3, 3.0, Vectors.dense([0.9, 0.8])),
(4, 1.0, Vectors.dense([0.75, 0.935])),
(5, 1.0, Vectors.dense([-0.83, -0.68])),
(6, 1.0, Vectors.dense([-0.91, -0.76])),
],
["index", "weight", "features"],
)
.coalesce(1)
.sortWithinPartitions("index")
.select("weight", "features")
)
gmm = GaussianMixture(
k=2,
maxIter=2,
weightCol="weight",
seed=1,
)
self.assertEqual(gmm.getK(), 2)
self.assertEqual(gmm.getMaxIter(), 2)
self.assertEqual(gmm.getWeightCol(), "weight")
self.assertEqual(gmm.getSeed(), 1)
model = gmm.fit(df)
self.assertEqual(gmm.uid, model.uid)
# TODO: support GMM.numFeatures in Python
# self.assertEqual(model.numFeatures, 2)
self.assertEqual(len(model.weights), 2)
self.assertTrue(
np.allclose(model.weights, [0.541014115744985, 0.4589858842550149], atol=1e-4),
model.weights,
)
# check the gaussians
gaussians = model.gaussians
self.assertEqual(len(gaussians), 2)
self.assertTrue(
np.allclose(
gaussians[0].mean.toArray(),
[0.28586084899633746, 0.28513455726904297],
atol=1e-4,
),
gaussians[0].mean,
)
self.assertTrue(
np.allclose(
gaussians[0].cov.toArray(),
[[0.41732752, 0.38378601], [0.38378601, 0.36454957]],
atol=1e-4,
),
gaussians[0].cov,
)
self.assertTrue(
np.allclose(
gaussians[1].mean.toArray(),
[0.04795771063097124, 0.06212817950777127],
atol=1e-4,
),
gaussians[1].mean,
)
self.assertTrue(
np.allclose(
gaussians[1].cov.toArray(),
[[0.50359595, 0.44696663], [0.44696663, 0.40424231]],
atol=1e-4,
),
gaussians[1].cov,
)
self.assertEqual(model.gaussiansDF.columns, ["mean", "cov"])
self.assertEqual(model.gaussiansDF.count(), 2)
vec = Vectors.dense(0.0, 5.0)
pred = model.predict(vec)
self.assertTrue(np.allclose(pred, 0, atol=1e-4), pred)
pred = model.predictProbability(vec)
self.assertTrue(np.allclose(pred.toArray(), [0.5, 0.5], atol=1e-4), pred)
output = model.transform(df)
expected_cols = ["weight", "features", "probability", "prediction"]
self.assertEqual(output.columns, expected_cols)
self.assertEqual(output.count(), 6)
# Model summary
self.assertTrue(model.hasSummary)
summary = model.summary
self.assertTrue(isinstance(summary, GaussianMixtureSummary))
self.assertEqual(summary.k, 2)
self.assertEqual(summary.numIter, 2)
self.assertEqual(len(summary.clusterSizes), 2)
self.assertEqual(summary.clusterSizes, [3, 3])
ll = summary.logLikelihood
self.assertTrue(ll < 0, ll)
self.assertTrue(np.allclose(ll, -1.311264553744033, atol=1e-4), ll)
self.assertEqual(summary.featuresCol, "features")
self.assertEqual(summary.predictionCol, "prediction")
self.assertEqual(summary.probabilityCol, "probability")
self.assertEqual(summary.cluster.columns, ["prediction"])
self.assertEqual(summary.cluster.count(), 6)
self.assertEqual(summary.predictions.columns, expected_cols)
self.assertEqual(summary.predictions.count(), 6)
self.assertEqual(summary.probability.columns, ["probability"])
self.assertEqual(summary.predictions.count(), 6)
# save & load
with tempfile.TemporaryDirectory(prefix="gaussian_mixture") as d:
gmm.write().overwrite().save(d)
gmm2 = GaussianMixture.load(d)
self.assertEqual(str(gmm), str(gmm2))
model.write().overwrite().save(d)
model2 = GaussianMixtureModel.load(d)
self.assertEqual(str(model), str(model2))
def test_local_lda(self):
spark = self.spark
df = (
spark.createDataFrame(
[
[1, Vectors.dense([0.0, 1.0])],
[2, SparseVector(2, {0: 1.0})],
],
["id", "features"],
)
.coalesce(1)
.sortWithinPartitions("id")
)
lda = LDA(k=2, optimizer="online", seed=1)
lda.setMaxIter(1)
self.assertEqual(lda.getK(), 2)
self.assertEqual(lda.getOptimizer(), "online")
self.assertEqual(lda.getMaxIter(), 1)
self.assertEqual(lda.getSeed(), 1)
model = lda.fit(df)
self.assertEqual(lda.uid, model.uid)
self.assertIsInstance(model, LDAModel)
self.assertIsInstance(model, LocalLDAModel)
self.assertNotIsInstance(model, DistributedLDAModel)
self.assertFalse(model.isDistributed())
dc = model.estimatedDocConcentration()
self.assertTrue(np.allclose(dc.toArray(), [0.5, 0.5], atol=1e-4), dc)
topics = model.topicsMatrix()
self.assertTrue(
np.allclose(
topics.toArray(), [[1.20296728, 1.15740442], [0.99357675, 1.02993164]], atol=1e-4
),
topics,
)
ll = model.logLikelihood(df)
self.assertTrue(np.allclose(ll, -3.2125122434040088, atol=1e-4), ll)
lp = model.logPerplexity(df)
self.assertTrue(np.allclose(lp, 1.6062561217020044, atol=1e-4), lp)
dt = model.describeTopics()
self.assertEqual(dt.columns, ["topic", "termIndices", "termWeights"])
self.assertEqual(dt.count(), 2)
# LocalLDAModel specific methods
self.assertEqual(model.vocabSize(), 2)
output = model.transform(df)
expected_cols = ["id", "features", "topicDistribution"]
self.assertEqual(output.columns, expected_cols)
self.assertEqual(output.count(), 2)
# save & load
with tempfile.TemporaryDirectory(prefix="local_lda") as d:
lda.write().overwrite().save(d)
lda2 = LDA.load(d)
self.assertEqual(str(lda), str(lda2))
model.write().overwrite().save(d)
model2 = LocalLDAModel.load(d)
self.assertEqual(str(model), str(model2))
def test_distributed_lda(self):
spark = self.spark
df = (
spark.createDataFrame(
[
[1, Vectors.dense([0.0, 1.0])],
[2, SparseVector(2, {0: 1.0})],
],
["id", "features"],
)
.coalesce(1)
.sortWithinPartitions("id")
)
lda = LDA(k=2, optimizer="em", seed=1)
lda.setMaxIter(1)
self.assertEqual(lda.getK(), 2)
self.assertEqual(lda.getOptimizer(), "em")
self.assertEqual(lda.getMaxIter(), 1)
self.assertEqual(lda.getSeed(), 1)
model = lda.fit(df)
self.assertEqual(lda.uid, model.uid)
self.assertIsInstance(model, LDAModel)
self.assertNotIsInstance(model, LocalLDAModel)
self.assertIsInstance(model, DistributedLDAModel)
self.assertTrue(model.isDistributed())
self.assertEqual(model.vocabSize(), 2)
dc = model.estimatedDocConcentration()
self.assertTrue(np.allclose(dc.toArray(), [26.0, 26.0], atol=1e-4), dc)
topics = model.topicsMatrix()
self.assertTrue(
np.allclose(
topics.toArray(), [[0.39149926, 0.60850074], [0.60991237, 0.39008763]], atol=1e-4
),
topics,
)
ll = model.logLikelihood(df)
self.assertTrue(np.allclose(ll, -3.719138517085772, atol=1e-4), ll)
lp = model.logPerplexity(df)
self.assertTrue(np.allclose(lp, 1.859569258542886, atol=1e-4), lp)
dt = model.describeTopics()
self.assertEqual(dt.columns, ["topic", "termIndices", "termWeights"])
self.assertEqual(dt.count(), 2)
# DistributedLDAModel specific methods
ll = model.trainingLogLikelihood()
self.assertTrue(np.allclose(ll, -1.3847360462201639, atol=1e-4), ll)
lp = model.logPrior()
self.assertTrue(np.allclose(lp, -69.59963186898915, atol=1e-4), lp)
model.getCheckpointFiles()
output = model.transform(df)
expected_cols = ["id", "features", "topicDistribution"]
self.assertEqual(output.columns, expected_cols)
self.assertEqual(output.count(), 2)
# Test toLocal()
localModel = model.toLocal()
self.assertIsInstance(localModel, LDAModel)
self.assertIsInstance(localModel, LocalLDAModel)
self.assertNotIsInstance(localModel, DistributedLDAModel)
self.assertFalse(localModel.isDistributed())
output = localModel.transform(df)
expected_cols = ["id", "features", "topicDistribution"]
self.assertEqual(output.columns, expected_cols)
self.assertEqual(output.count(), 2)
# save & load
with tempfile.TemporaryDirectory(prefix="distributed_lda") as d:
lda.write().overwrite().save(d)
lda2 = LDA.load(d)
self.assertEqual(str(lda), str(lda2))
model.write().overwrite().save(d)
model2 = DistributedLDAModel.load(d)
self.assertEqual(str(model), str(model2))
def test_power_iteration_clustering(self):
spark = self.spark
data = [
(1, 0, 0.5),
(2, 0, 0.5),
(2, 1, 0.7),
(3, 0, 0.5),
(3, 1, 0.7),
(3, 2, 0.9),
(4, 0, 0.5),
(4, 1, 0.7),
(4, 2, 0.9),
(4, 3, 1.1),
(5, 0, 0.5),
(5, 1, 0.7),
(5, 2, 0.9),
(5, 3, 1.1),
(5, 4, 1.3),
]
df = spark.createDataFrame(data, ["src", "dst", "weight"]).repartition(1)
pic = PowerIterationClustering(k=2, weightCol="weight")
pic.setMaxIter(40)
self.assertEqual(pic.getK(), 2)
self.assertEqual(pic.getMaxIter(), 40)
self.assertEqual(pic.getWeightCol(), "weight")
assignments = pic.assignClusters(df)
self.assertEqual(assignments.columns, ["id", "cluster"])
self.assertEqual(assignments.count(), 6)
# save & load
with tempfile.TemporaryDirectory(prefix="power_iteration_clustering") as d:
pic.write().overwrite().save(d)
pic2 = PowerIterationClustering.load(d)
self.assertEqual(str(pic), str(pic2))
self.assertEqual(pic.uid, pic2.uid)
self.assertEqual(pic.getK(), pic2.getK())
self.assertEqual(pic.getMaxIter(), pic2.getMaxIter())
self.assertEqual(pic.getWeightCol(), pic2.getWeightCol())
class ClusteringTests(ClusteringTestsMixin, ReusedSQLTestCase):
pass
if __name__ == "__main__":
from pyspark.ml.tests.test_clustering import * # noqa: F401,F403
try:
import xmlrunner # type: ignore[import]
testRunner = xmlrunner.XMLTestRunner(output="target/test-reports", verbosity=2)
except ImportError:
testRunner = None
unittest.main(testRunner=testRunner, verbosity=2)