blob: 4b407f039ccc84f3e1fbdef5b4a2b183ef12d1d2 [file] [log] [blame]
#-------------------------------------------------------------
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
#-------------------------------------------------------------
# Builtin function that implements the k-Means clustering algorithm
#
# INPUT PARAMETERS:
# ----------------------------------------------------------------------------
# NAME TYPE DEFAULT MEANING
# ----------------------------------------------------------------------------
# X Double --- The input Matrix to do KMeans on.
# k Int --- Number of centroids
# runs Int 10 Number of runs (with different initial centroids)
# max_iter Int 1000 Maximum number of iterations per run
# eps Double 0.000001 Tolerance (epsilon) for WCSS change ratio
# is_verbose Boolean FALSE do not print per-iteration stats
# avg_sample_size_per_centroid Int 50 Average number of records per centroid in data samples
#
#
# RETURN VALUES
# ----------------------------------------------------------------------------
# NAME TYPE DEFAULT MEANING
# ----------------------------------------------------------------------------
# Y String "Y.mtx" The mapping of records to centroids
# C String "C.mtx" The output matrix with the centroids
# ----------------------------------------------------------------------------
m_kmeans = function(Matrix[Double] X, Integer k = 10, Integer runs = 10, Integer max_iter = 1000,
Double eps = 0.000001, Boolean is_verbose = FALSE, Integer avg_sample_size_per_centroid = 50,
Integer seed = -1)
return (Matrix[Double] C, Matrix[Double] Y)
{
if( is_verbose )
print ("BEGIN K-MEANS SCRIPT");
num_records = nrow (X);
num_features = ncol (X);
num_centroids = k;
num_runs = runs;
if(is_verbose)
print("dim X=" + nrow(X) + "x" + ncol(X))
sumXsq = sum (X ^ 2);
# STEP 1: INITIALIZE CENTROIDS FOR ALL RUNS FROM DATA SAMPLES:
if( is_verbose )
print ("Taking data samples for initialization...");
[sample_maps, samples_vs_runs_map, sample_block_size] = get_sample_maps(
num_records, num_runs, num_centroids * avg_sample_size_per_centroid);
is_row_in_samples = rowSums (sample_maps);
X_samples = sample_maps %*% X;
X_samples_sq_norms = rowSums (X_samples ^ 2);
if( is_verbose )
print ("Initializing the centroids for all runs...");
All_Centroids = matrix (0, num_runs * num_centroids, num_features);
# We select centroids according to the k-Means++ heuristic applied to a sample of X
# Loop invariant: min_distances ~ sq.distances from X_sample rows to nearest centroids,
# with the out-of-range X_sample positions in min_distances set to 0.0
min_distances = is_row_in_samples; # Pick the 1-st centroids uniformly at random
for (i in 1 : num_centroids)
{
# "Matricize" and prefix-sum to compute the cumulative distribution function:
min_distances_matrix_form = matrix (min_distances, rows = sample_block_size, cols = num_runs, byrow = FALSE);
cdf_min_distances = cumsum (min_distances_matrix_form);
# Select the i-th centroid in each sample as a random sample row id with
# probability ~ min_distances:
random_row = rand(rows = 1, cols = num_runs, seed = seed);
threshold_matrix = random_row * cdf_min_distances [sample_block_size, ];
centroid_ids = t(colSums (cdf_min_distances < threshold_matrix)) + 1;
# Place the selected centroids together, one per run, into a matrix:
centroid_placer = table (seq (1, num_runs),
sample_block_size * seq (0, num_runs - 1) + centroid_ids, num_runs, sample_block_size * num_runs);
centroids = centroid_placer %*% X_samples;
# Place the selected centroids into their appropriate slots in All_Centroids:
centroid_placer = table (seq (i, num_centroids * (num_runs - 1) + i, num_centroids),
seq (1, num_runs, 1), nrow (All_Centroids), num_runs);
All_Centroids = All_Centroids + centroid_placer %*% centroids;
# Update min_distances to preserve the loop invariant:
distances = X_samples_sq_norms + samples_vs_runs_map %*% rowSums (centroids ^ 2)
- 2 * rowSums (X_samples * (samples_vs_runs_map %*% centroids));
min_distances = ifelse(i==1, is_row_in_samples*distances, min(min_distances,distances));
}
# STEP 2: PERFORM K-MEANS ITERATIONS FOR ALL RUNS:
termination_code = matrix (0, rows = num_runs, cols = 1);
final_wcss = matrix (0, rows = num_runs, cols = 1);
num_iterations = matrix (0, rows = num_runs, cols = 1);
if( is_verbose )
print ("Performing k-means iterations for all runs...");
parfor (run_index in 1 : num_runs, check = 0)
{
C = All_Centroids [(num_centroids * (run_index - 1) + 1) : (num_centroids * run_index), ];
C_old = C;
iter_count = 0;
term_code = 0;
wcss = Inf
while (term_code == 0)
{
# Compute Euclidean squared distances from records (X rows) to centroids (C rows)
# without the C-independent term, then take the minimum for each record
D = -2 * (X %*% t(C)) + t(rowSums (C ^ 2));
minD = rowMins (D);
# Compute the current centroid-based within-cluster sum of squares (WCSS)
wcss_old = wcss;
wcss = sumXsq + sum (minD);
if( is_verbose ) {
if (iter_count == 0)
print ("Run " + run_index + ", At Start-Up: Centroid WCSS = " + wcss);
else
print ("Run " + run_index + ", Iteration " + iter_count + ": Centroid WCSS = " + wcss
+ "; Centroid change (avg.sq.dist.) = " + (sum ((C - C_old) ^ 2) / num_centroids));
}
# Find the closest centroid for each record
P = D <= minD;
# If some records belong to multiple centroids, share them equally
P = P / rowSums (P);
# Compute the column normalization factor for P
P_denom = colSums (P);
# Compute new centroids as weighted averages over the records
C_new = (t(P) %*% X) / t(P_denom);
# Check if convergence or maximum iteration has been reached
iter_count = iter_count + 1
if(wcss_old - wcss < eps * wcss)
term_code = 1; # Convergence reached
else if(iter_count >= max_iter)
term_code = 2; # Max iteration reached
else if(sum (P_denom <= 0) > 0)
term_code = 3; # "Runaway" centroid (0.0 denom)
else
C_old = C; C = C_new;
}
if(is_verbose)
print ("Run " + run_index + ", Iteration " + iter_count + ": Terminated with code = "
+ term_code + ", Centroid WCSS = " + wcss);
All_Centroids [(num_centroids * (run_index - 1) + 1) : (num_centroids * run_index), ] = C;
final_wcss [run_index, 1] = wcss;
termination_code [run_index, 1] = term_code;
num_iterations [run_index, 1] = iter_count;
}
# STEP 3: SELECT THE RUN WITH BEST CENTROID-WCSS AND OUTPUT ITS CENTROIDS:
termination_bitmap = matrix (0, num_runs, 3);
termination_bitmap_raw = table (seq (1, num_runs, 1), termination_code);
termination_bitmap [, 1 : ncol(termination_bitmap_raw)] = termination_bitmap_raw;
termination_stats = colSums (termination_bitmap);
if(is_verbose){
print ("Number of successful runs = " + as.integer (as.scalar (termination_stats [1, 1])));
print ("Number of incomplete runs = " + as.integer (as.scalar (termination_stats [1, 2])));
print ("Number of failed runs (with lost centroids) = " + as.integer (as.scalar (termination_stats [1, 3])));
}
num_successful_runs = as.scalar (termination_stats [1, 1]);
if (num_successful_runs > 0)
{
final_wcss_successful = replace(target = final_wcss, pattern = Inf, replacement = 0)* termination_bitmap [, 1];
worst_wcss = max (final_wcss_successful);
best_wcss = min (final_wcss_successful + (10 * worst_wcss + 10) * (1 - termination_bitmap [, 1]));
avg_wcss = sum (final_wcss_successful) / num_successful_runs;
best_index_vector = (final_wcss_successful == best_wcss);
aggr_best_index_vector = cumsum (best_index_vector);
best_index = as.integer (sum (aggr_best_index_vector == 0) + 1);
if(is_verbose)
print ("Successful runs: Best run is " + best_index + " with Centroid WCSS = " + best_wcss
+ "; Avg WCSS = " + avg_wcss + "; Worst WCSS = " + worst_wcss);
C = All_Centroids [(num_centroids * (best_index - 1) + 1) : (num_centroids * best_index), ];
D = -2 * (X %*% t(C)) + t(rowSums (C ^ 2));
P = (D <= rowMins (D));
aggr_P = t(cumsum (t(P)));
Y = rowSums (aggr_P == 0) + 1
if(is_verbose)
print("dim C=" + nrow(C) + "x" + ncol(C) + ", dim Y=" + nrow(Y) + "x" + ncol(Y))
}
else{
print ("K-means: No output is produced. Try increasing the number of iterations and/or lower eps.");
C = matrix(0, num_centroids, num_records)
Y = matrix(-1, 1, num_records)
}
}
get_sample_maps = function (int num_records, int num_samples, int approx_sample_size)
return (Matrix[double] sample_maps, Matrix[double] sample_col_map, int sample_block_size)
{
if (approx_sample_size < num_records)
{
# Input value "approx_sample_size" is the average sample size; increase it by ~10 std.dev's
# to get the sample block size (to allocate space):
sample_block_size = as.integer (approx_sample_size + round (10 * sqrt (approx_sample_size)));
num_rows = sample_block_size * num_samples;
# Generate all samples in parallel by converting uniform random values into random
# integer skip-ahead intervals and prefix-summing them:
sample_rec_ids = Rand (rows = sample_block_size, cols = num_samples, min = 0.0, max = 1.0);
sample_rec_ids = round (log (sample_rec_ids) / log (1.0 - approx_sample_size / num_records) + 0.5);
# Prob [k-1 < log(uniform)/log(1-p) < k] = p*(1-p)^(k-1) = Prob [k-1 zeros before a one]
sample_rec_ids = cumsum (sample_rec_ids); # (skip to next one) --> (skip to i-th one)
# Replace all sample record ids over "num_records" (i.e. out of range) by "num_records + 1":
is_sample_rec_id_within_range = (sample_rec_ids <= num_records);
sample_rec_ids = sample_rec_ids * is_sample_rec_id_within_range
+ (num_records + 1) * (1 - is_sample_rec_id_within_range);
# Rearrange all samples (and their out-of-range indicators) into one column-vector:
sample_rec_ids = matrix (sample_rec_ids, rows = num_rows, cols = 1, byrow = FALSE);
is_row_in_samples = matrix (is_sample_rec_id_within_range, rows = num_rows, cols = 1, byrow = FALSE);
# Use contingency table to create the "sample_maps" matrix that is a vertical concatenation
# of 0-1-matrices, one per sample, each with 1s at (i, sample_record[i]) and 0s elsewhere:
sample_maps = table (seq (1, num_rows), sample_rec_ids, num_rows, num_records);
# Create a 0-1-matrix that maps each sample column ID into all row positions of the
# corresponding sample; map out-of-sample-range positions to row id = num_rows + 1:
sample_positions = (num_rows + 1) - is_row_in_samples * seq (num_rows, 1, -1);
# Column ID positions = 1, 1, ..., 1, 2, 2, ..., 2, . . . , n_c, n_c, ..., n_c:
col_positions = round (0.5 + seq (0, num_rows - 1, 1) / sample_block_size);
sample_col_map = table (sample_positions, col_positions);
# Remove the out-of-sample-range positions by cutting off the last row:
sample_col_map = sample_col_map [1 : (num_rows), ];
}
else {
one_per_record = matrix (1, num_records, 1);
sample_block_size = num_records;
sample_maps = matrix (0, (num_records * num_samples), num_records);
sample_col_map = matrix (0, (num_records * num_samples), num_samples);
for (i in 1:num_samples) {
sample_maps [(num_records * (i - 1) + 1) : (num_records * i), ] = diag (one_per_record);
sample_col_map [(num_records * (i - 1) + 1) : (num_records * i), i] = one_per_record;
}
}
}