blob: 57a88538064069f0ae3d347f24d8738e87e05242 [file] [log] [blame]
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import os
import typing
from pyflink.common import Types, Row
from typing import List, Dict, Set
from pyflink.ml.linalg import Vectors, DenseVectorTypeInfo, DenseVector
from pyflink.ml.clustering.kmeans import KMeans, KMeansModel, OnlineKMeans
from pyflink.ml.tests.test_utils import PyFlinkMLTestCase, update_existing_params
def group_features_by_prediction(
rows: List[Row], feature_index: int, prediction_index: int):
map = {} # type: Dict[int, Set]
for row in rows:
vector = typing.cast(DenseVector, row[feature_index])
predict = typing.cast(int, row[prediction_index])
if predict in map:
l = map[predict]
else:
l = set()
map[predict] = l
l.add(vector)
return [item for item in map.values()]
class KMeansTest(PyFlinkMLTestCase):
def setUp(self):
super(KMeansTest, self).setUp()
self.data_table = self.t_env.from_data_stream(
self.env.from_collection([
(Vectors.dense([0.0, 0.0]),),
(Vectors.dense([0.0, 0.3]),),
(Vectors.dense([0.3, 0.0]),),
(Vectors.dense([9.0, 0.0]),),
(Vectors.dense([9.0, 0.6]),),
(Vectors.dense([9.6, 0.0]),),
],
type_info=Types.ROW_NAMED(
['features'],
[DenseVectorTypeInfo()])))
self.expected_groups = [
{DenseVector([0.0, 0.3]), DenseVector([0.3, 0.0]), DenseVector([0.0, 0.0])},
{DenseVector([9.6, 0.0]), DenseVector([9.0, 0.0]), DenseVector([9.0, 0.6])}]
def test_param(self):
kmeans = KMeans()
self.assertEqual('features', kmeans.get_features_col())
self.assertEqual('prediction', kmeans.get_prediction_col())
self.assertEqual('euclidean', kmeans.get_distance_measure())
self.assertEqual('random', kmeans.get_init_mode())
self.assertEqual(2, kmeans.get_k())
self.assertEqual(20, kmeans.get_max_iter())
kmeans.set_k(9) \
.set_features_col('test_feature') \
.set_prediction_col('test_prediction') \
.set_k(3) \
.set_max_iter(30) \
.set_seed(100)
self.assertEqual('test_feature', kmeans.get_features_col())
self.assertEqual('test_prediction', kmeans.get_prediction_col())
self.assertEqual(3, kmeans.get_k())
self.assertEqual(30, kmeans.get_max_iter())
self.assertEqual(100, kmeans.get_seed())
def test_output_schema(self):
input = self.data_table.alias('test_feature')
kmeans = KMeans().set_features_col('test_feature').set_prediction_col('test_prediction')
model = kmeans.fit(input)
output = model.transform(input)[0]
field_names = output.get_schema().get_field_names()
self.assertEqual(['test_feature', 'test_prediction'],
field_names)
results = [result for result in self.t_env.to_data_stream(output).execute_and_collect()]
actual_groups = group_features_by_prediction(
results,
field_names.index(kmeans.features_col),
field_names.index(kmeans.prediction_col))
self.assertTrue(actual_groups[0] == self.expected_groups[0]
and actual_groups[1] == self.expected_groups[1] or
actual_groups[0] == self.expected_groups[1]
and actual_groups[1] == self.expected_groups[0])
def test_fewer_distinct_points_than_cluster(self):
input = self.t_env.from_data_stream(
self.env.from_collection([
(Vectors.dense([0.0, 0.1]),),
(Vectors.dense([0.0, 0.1]),),
(Vectors.dense([0.0, 0.1]),),
],
type_info=Types.ROW_NAMED(
['features'],
[DenseVectorTypeInfo()])))
kmeans = KMeans().set_k(2)
model = kmeans.fit(input)
output = model.transform(input)[0]
results = [result for result in self.t_env.to_data_stream(output).execute_and_collect()]
field_names = output.get_schema().get_field_names()
actual_groups = group_features_by_prediction(
results,
field_names.index(kmeans.features_col),
field_names.index(kmeans.prediction_col))
expected_groups = [{DenseVector([0.0, 0.1])}]
self.assertEqual(actual_groups, expected_groups)
def test_fit_and_predict(self):
kmeans = KMeans().set_max_iter(2).set_k(2)
model = kmeans.fit(self.data_table)
output = model.transform(self.data_table)[0]
self.assertEqual(['features', 'prediction'], output.get_schema().get_field_names())
results = [result for result in self.t_env.to_data_stream(output).execute_and_collect()]
field_names = output.get_schema().get_field_names()
actual_groups = group_features_by_prediction(
results,
field_names.index(kmeans.features_col),
field_names.index(kmeans.prediction_col))
self.assertTrue(actual_groups[0] == self.expected_groups[0]
and actual_groups[1] == self.expected_groups[1] or
actual_groups[0] == self.expected_groups[1]
and actual_groups[1] == self.expected_groups[0])
def test_get_model_data(self):
kmeans = KMeans().set_max_iter(2).set_k(2)
model = kmeans.fit(self.data_table)
model_data = model.get_model_data()[0]
expected_field_names = ['centroids', 'weights']
self.assertEqual(expected_field_names, model_data.get_schema().get_field_names())
model_rows = [result for result in
self.t_env.to_data_stream(model_data).execute_and_collect()]
self.assertEqual(1, len(model_rows))
centroids = model_rows[0][expected_field_names.index('centroids')]
self.assertEqual(2, len(centroids))
centroids.sort(key=lambda x: x.get(0))
self.assertListAlmostEqual([0.1, 0.1], centroids[0], delta=1e-5)
self.assertListAlmostEqual([9.2, 0.2], centroids[1], delta=1e-5)
def test_set_model_data(self):
kmeans = KMeans().set_max_iter(2).set_k(2)
model_a = kmeans.fit(self.data_table)
model_data = model_a.get_model_data()[0]
model_b = KMeansModel().set_model_data(model_data)
update_existing_params(model_b, model_a)
output = model_b.transform(self.data_table)[0]
self.assertEqual(['features', 'prediction'], output.get_schema().get_field_names())
results = [result for result in self.t_env.to_data_stream(output).execute_and_collect()]
field_names = output.get_schema().get_field_names()
actual_groups = group_features_by_prediction(
results,
field_names.index(kmeans.features_col),
field_names.index(kmeans.prediction_col))
self.assertTrue(actual_groups[0] == self.expected_groups[0]
and actual_groups[1] == self.expected_groups[1] or
actual_groups[0] == self.expected_groups[1]
and actual_groups[1] == self.expected_groups[0])
def test_save_load_and_predict(self):
kmeans = KMeans().set_max_iter(2).set_k(2)
model = kmeans.fit(self.data_table)
path = os.path.join(self.temp_dir, 'test_save_load_and_predict_kmeans_model')
model.save(path)
self.env.execute()
loaded_model = KMeansModel.load(self.t_env, path) # type: KMeansModel
output = loaded_model.transform(self.data_table)[0]
self.assertEqual(
['centroids', 'weights'],
loaded_model.get_model_data()[0].get_schema().get_field_names())
self.assertEqual(
['features', 'prediction'],
output.get_schema().get_field_names())
results = [result for result in self.t_env.to_data_stream(output).execute_and_collect()]
field_names = output.get_schema().get_field_names()
actual_groups = group_features_by_prediction(
results,
field_names.index(kmeans.features_col),
field_names.index(kmeans.prediction_col))
self.assertTrue(actual_groups[0] == self.expected_groups[0]
and actual_groups[1] == self.expected_groups[1] or
actual_groups[0] == self.expected_groups[1]
and actual_groups[1] == self.expected_groups[0])
class OnlineKMeansTest(PyFlinkMLTestCase):
def setUp(self):
super(OnlineKMeansTest, self).setUp()
def test_param(self):
online_kmeans = OnlineKMeans()
self.assertEqual('features', online_kmeans.features_col)
self.assertEqual('prediction', online_kmeans.prediction_col)
self.assertEqual('count', online_kmeans.batch_strategy)
self.assertEqual('euclidean', online_kmeans.distance_measure)
self.assertEqual(32, online_kmeans.global_batch_size)
self.assertEqual(0., online_kmeans.decay_factor)
online_kmeans.set_features_col('test_feature') \
.set_prediction_col('test_prediction') \
.set_global_batch_size(5) \
.set_decay_factor(0.25) \
.set_seed(100)
self.assertEqual('test_feature', online_kmeans.features_col)
self.assertEqual('test_prediction', online_kmeans.prediction_col)
self.assertEqual('count', online_kmeans.batch_strategy)
self.assertEqual('euclidean', online_kmeans.distance_measure)
self.assertEqual(5, online_kmeans.global_batch_size)
self.assertEqual(0.25, online_kmeans.decay_factor)
self.assertEqual(100, online_kmeans.get_seed())