| # |
| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| from numpy import array, dot |
| from math import sqrt |
| from pyspark import SparkContext |
| from pyspark.mllib._common import \ |
| _get_unmangled_rdd, _get_unmangled_double_vector_rdd, _squared_distance, \ |
| _serialize_double_matrix, _deserialize_double_matrix, \ |
| _serialize_double_vector, _deserialize_double_vector, \ |
| _get_initial_weights, _serialize_rating, _regression_train_wrapper |
| from pyspark.mllib.linalg import SparseVector |
| |
| |
| class KMeansModel(object): |
| |
| """A clustering model derived from the k-means method. |
| |
| >>> data = array([0.0,0.0, 1.0,1.0, 9.0,8.0, 8.0,9.0]).reshape(4,2) |
| >>> model = KMeans.train( |
| ... sc.parallelize(data), 2, maxIterations=10, runs=30, initializationMode="random") |
| >>> model.predict(array([0.0, 0.0])) == model.predict(array([1.0, 1.0])) |
| True |
| >>> model.predict(array([8.0, 9.0])) == model.predict(array([9.0, 8.0])) |
| True |
| >>> model = KMeans.train(sc.parallelize(data), 2) |
| >>> sparse_data = [ |
| ... SparseVector(3, {1: 1.0}), |
| ... SparseVector(3, {1: 1.1}), |
| ... SparseVector(3, {2: 1.0}), |
| ... SparseVector(3, {2: 1.1}) |
| ... ] |
| >>> model = KMeans.train(sc.parallelize(sparse_data), 2, initializationMode="k-means||") |
| >>> model.predict(array([0., 1., 0.])) == model.predict(array([0, 1.1, 0.])) |
| True |
| >>> model.predict(array([0., 0., 1.])) == model.predict(array([0, 0, 1.1])) |
| True |
| >>> model.predict(sparse_data[0]) == model.predict(sparse_data[1]) |
| True |
| >>> model.predict(sparse_data[2]) == model.predict(sparse_data[3]) |
| True |
| >>> type(model.clusterCenters) |
| <type 'list'> |
| """ |
| |
| def __init__(self, centers): |
| self.centers = centers |
| |
| @property |
| def clusterCenters(self): |
| """Get the cluster centers, represented as a list of NumPy arrays.""" |
| return self.centers |
| |
| def predict(self, x): |
| """Find the cluster to which x belongs in this model.""" |
| best = 0 |
| best_distance = float("inf") |
| for i in range(0, len(self.centers)): |
| distance = _squared_distance(x, self.centers[i]) |
| if distance < best_distance: |
| best = i |
| best_distance = distance |
| return best |
| |
| |
| class KMeans(object): |
| |
| @classmethod |
| def train(cls, data, k, maxIterations=100, runs=1, initializationMode="k-means||"): |
| """Train a k-means clustering model.""" |
| sc = data.context |
| dataBytes = _get_unmangled_double_vector_rdd(data) |
| ans = sc._jvm.PythonMLLibAPI().trainKMeansModel( |
| dataBytes._jrdd, k, maxIterations, runs, initializationMode) |
| if len(ans) != 1: |
| raise RuntimeError("JVM call result had unexpected length") |
| elif type(ans[0]) != bytearray: |
| raise RuntimeError("JVM call result had first element of type " |
| + type(ans[0]) + " which is not bytearray") |
| matrix = _deserialize_double_matrix(ans[0]) |
| return KMeansModel([row for row in matrix]) |
| |
| |
| def _test(): |
| import doctest |
| globs = globals().copy() |
| globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) |
| (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) |
| globs['sc'].stop() |
| if failure_count: |
| exit(-1) |
| |
| |
| if __name__ == "__main__": |
| _test() |