blob: 87174737ced2abb790e5436fa2f21fb56d780eb9 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Implements the k-Means clustering algorithm
# ----------------------------------------------------------------------------
# ----------------------------------------------------------------------------
# X String --- Location to read matrix X with the input data records
# k Int --- Number of centroids
# runs Int 10 Number of runs (with different initial centroids)
# maxi Int 1000 Maximum number of iterations per run
# tol Double 0.000001 Tolerance (epsilon) for WCSS change ratio
# samp Int 50 Average number of records per centroid in data samples
# C String "C.mtx" Location to store the output matrix with the centroids
# isY Int 0 0 = do not write Y, 1 = write Y
# Y String "Y.mtx" Location to store the mapping of records to centroids
# fmt String "text" Matrix output format, usually "text" or "csv"
# verb Int 0 0 = do not print per-iteration stats, 1 = print them
# ----------------------------------------------------------------------------
# Example:
# hadoop jar SystemML.jar -f Kmeans.dml -nvargs X=X.mtx k=5 C=centroids.mtx
# hadoop jar SystemML.jar -f Kmeans.dml -nvargs X=X.mtx k=5 runs=100 maxi=5000 tol=0.00000001 samp=20 C=centroids.mtx isY=1 Y=clusters.mtx verb=1
fileX = $X;
fileY = ifdef ($Y, "Y.mtx");
fileC = ifdef ($C, "C.mtx");
num_centroids = $k;
num_runs = ifdef ($runs, 10); # $runs=10;
max_iter = ifdef ($maxi, 1000); # $maxi=1000;
eps = ifdef ($tol, 0.000001); # $tol=0.000001;
is_write_Y = ifdef ($isY, 0); # $isY=0;
is_verbose = ifdef ($verb, 0); # $verb=0;
fmtCY = ifdef ($fmt, "text"); # $fmt="text";
avg_sample_size_per_centroid = ifdef ($samp, 50); # $samp=50;
print ("Reading X...");
# X : matrix of data points as rows
X = read (fileX);
num_records = nrow (X);
num_features = ncol (X);
sumXsq = sum (X ^ 2);
# Remark - A useful rewrite: sum (A %*% B) = sum (t(colSums(A)) * rowSums(B))
print ("Taking data samples for initialization...");
[sample_maps, samples_vs_runs_map, sample_block_size] =
get_sample_maps (num_records, num_runs, num_centroids * avg_sample_size_per_centroid);
is_row_in_samples = rowSums (sample_maps);
X_samples = sample_maps %*% X;
X_samples_sq_norms = rowSums (X_samples ^ 2);
print ("Initializing the centroids for all runs...");
All_Centroids = matrix (0, rows = (num_runs * num_centroids), cols = num_features);
# We select centroids according to the k-Means++ heuristic applied to a sample of X
# Loop invariant: min_distances ~ sq.distances from X_sample rows to nearest centroids,
# with the out-of-range X_sample positions in min_distances set to 0.0
min_distances = is_row_in_samples; # Pick the 1-st centroids uniformly at random
for (i in 1 : num_centroids)
# "Matricize" and prefix-sum to compute the cumulative distribution function:
min_distances_matrix_form =
matrix (min_distances, rows = sample_block_size, cols = num_runs, byrow = FALSE);
cdf_min_distances = cumsum (min_distances_matrix_form);
# Select the i-th centroid in each sample as a random sample row id with
# probability ~ min_distances:
random_row = Rand (rows = 1, cols = num_runs, min = 0.0, max = 1.0);
threshold_matrix = random_row * cdf_min_distances [sample_block_size, ];
centroid_ids = t(colSums (ppred (cdf_min_distances, threshold_matrix, "<"))) + 1;
# Place the selected centroids together, one per run, into a matrix:
centroid_placer = matrix (0, rows = num_runs, cols = (sample_block_size * num_runs));
centroid_placer_raw =
table (seq (1, num_runs, 1), sample_block_size * seq (0, num_runs - 1, 1) + centroid_ids);
centroid_placer [, 1 : ncol (centroid_placer_raw)] = centroid_placer_raw;
centroids = centroid_placer %*% X_samples;
# Place the selected centroids into their appropriate slots in All_Centroids:
centroid_placer = matrix (0, rows = nrow (All_Centroids), cols = num_runs);
centroid_placer_raw =
table (seq (i, num_centroids * (num_runs - 1) + i, num_centroids), seq (1, num_runs, 1));
centroid_placer [1 : nrow (centroid_placer_raw), ] = centroid_placer_raw;
All_Centroids = All_Centroids + centroid_placer %*% centroids;
# Update min_distances to preserve the loop invariant:
distances = X_samples_sq_norms + samples_vs_runs_map %*% rowSums (centroids ^ 2)
- 2 * rowSums (X_samples * (samples_vs_runs_map %*% centroids));
if (i == 1) {
min_distances = is_row_in_samples * distances;
} else {
min_distances = min (min_distances, distances);
} }
termination_code = matrix (0, rows = num_runs, cols = 1);
final_wcss = matrix (0, rows = num_runs, cols = 1);
num_iterations = matrix (0, rows = num_runs, cols = 1);
print ("Performing k-means iterations for all runs...");
parfor (run_index in 1 : num_runs, check = 0)
C = All_Centroids [(num_centroids * (run_index - 1) + 1) : (num_centroids * run_index), ];
C_old = C;
iter_count = 0;
term_code = 0;
wcss = 0;
while (term_code == 0)
# Compute Euclidean squared distances from records (X rows) to centroids (C rows)
# without the C-independent term, then take the minimum for each record
D = -2 * (X %*% t(C)) + t(rowSums (C ^ 2));
minD = rowMins (D);
# Compute the current centroid-based within-cluster sum of squares (WCSS)
wcss_old = wcss;
wcss = sumXsq + sum (minD);
if (is_verbose == 1) {
if (iter_count == 0) {
print ("Run " + run_index + ", At Start-Up: Centroid WCSS = " + wcss);
} else {
print ("Run " + run_index + ", Iteration " + iter_count + ": Centroid WCSS = " + wcss
+ "; Centroid change (avg.sq.dist.) = " + (sum ((C - C_old) ^ 2) / num_centroids));
} }
# Check if convergence or maximum iteration has been reached
if (wcss_old - wcss < eps * wcss & iter_count > 0) {
term_code = 1; # Convergence is reached
} else {
if (iter_count >= max_iter) {
term_code = 2; # Maximum iteration is reached
} else {
iter_count = iter_count + 1;
# Find the closest centroid for each record
P = ppred (D, minD, "<=");
# If some records belong to multiple centroids, share them equally
P = P / rowSums (P);
# Compute the column normalization factor for P
P_denom = colSums (P);
if (sum (ppred (P_denom, 0.0, "<=")) > 0) {
term_code = 3; # There is a "runaway" centroid with 0.0 denominator
} else {
C_old = C;
# Compute new centroids as weighted averages over the records
C = (t(P) %*% X) / t(P_denom);
} } } }
print ("Run " + run_index + ", Iteration " + iter_count + ": Terminated with code = " + term_code + ", Centroid WCSS = " + wcss);
All_Centroids [(num_centroids * (run_index - 1) + 1) : (num_centroids * run_index), ] = C;
final_wcss [run_index, 1] = wcss;
termination_code [run_index, 1] = term_code;
num_iterations [run_index, 1] = iter_count;
termination_bitmap = matrix (0, rows = num_runs, cols = 3);
termination_bitmap_raw = table (seq (1, num_runs, 1), termination_code);
termination_bitmap [, 1 : ncol(termination_bitmap_raw)] = termination_bitmap_raw;
termination_stats = colSums (termination_bitmap);
print ("Number of successful runs = " + as.integer (as.scalar (termination_stats [1, 1])));
print ("Number of incomplete runs = " + as.integer (as.scalar (termination_stats [1, 2])));
print ("Number of failed runs (with lost centroids) = " + as.integer (as.scalar (termination_stats [1, 3])));
num_successful_runs = as.scalar (termination_stats [1, 1]);
if (num_successful_runs > 0) {
final_wcss_successful = final_wcss * termination_bitmap [, 1];
worst_wcss = max (final_wcss_successful);
best_wcss = min (final_wcss_successful + (10 * worst_wcss + 10) * (1 - termination_bitmap [, 1]));
avg_wcss = sum (final_wcss_successful) / num_successful_runs;
best_index_vector = ppred (final_wcss_successful, best_wcss, "==");
aggr_best_index_vector = cumsum (best_index_vector);
best_index = as.integer (sum (ppred (aggr_best_index_vector, 0, "==")) + 1);
print ("Successful runs: Best run is " + best_index + " with Centroid WCSS = " + best_wcss
+ "; Avg WCSS = " + avg_wcss + "; Worst WCSS = " + worst_wcss);
C = All_Centroids [(num_centroids * (best_index - 1) + 1) : (num_centroids * best_index), ];
print ("Writing out the best-WCSS centroids...");
write (C, fileC, format=fmtCY);
if (is_write_Y == 1) {
print ("Writing out the best-WCSS cluster labels...");
D = -2 * (X %*% t(C)) + t(rowSums (C ^ 2));
P = ppred (D, rowMins (D), "<=");
aggr_P = t(cumsum (t(P)));
Y = rowSums (ppred (aggr_P, 0, "==")) + 1
write (Y, fileY, format=fmtCY);
print ("DONE.");
} else {
stop ("No output is produced. Try increasing the number of iterations and/or runs.");
get_sample_maps = function (int num_records, int num_samples, int approx_sample_size)
return (Matrix[double] sample_maps, Matrix[double] sample_col_map, int sample_block_size)
if (approx_sample_size < num_records) {
# Input value "approx_sample_size" is the average sample size; increase it by ~10's
# to get the sample block size (to allocate space):
sample_block_size = as.integer (approx_sample_size + round (10 * sqrt (approx_sample_size)));
num_rows = sample_block_size * num_samples;
# Generate all samples in parallel by converting uniform random values into random
# integer skip-ahead intervals and prefix-summing them:
sample_rec_ids = Rand (rows = sample_block_size, cols = num_samples, min = 0.0, max = 1.0);
sample_rec_ids = round (log (sample_rec_ids) / log (1.0 - approx_sample_size / num_records) + 0.5);
# Prob [k-1 < log(uniform)/log(1-p) < k] = p*(1-p)^(k-1) = Prob [k-1 zeros before a one]
sample_rec_ids = cumsum (sample_rec_ids); # (skip to next one) --> (skip to i-th one)
# Replace all sample record ids over "num_records" (i.e. out of range) by "num_records + 1":
is_sample_rec_id_within_range = ppred (sample_rec_ids, num_records, "<=");
sample_rec_ids = sample_rec_ids * is_sample_rec_id_within_range
+ (num_records + 1) * (1 - is_sample_rec_id_within_range);
# Rearrange all samples (and their out-of-range indicators) into one column-vector:
sample_rec_ids =
matrix (sample_rec_ids, rows = num_rows, cols = 1, byrow = FALSE);
is_row_in_samples =
matrix (is_sample_rec_id_within_range, rows = num_rows, cols = 1, byrow = FALSE);
# Use contingency table to create the "sample_maps" matrix that is a vertical concatenation
# of 0-1-matrices, one per sample, each with 1s at (i, sample_record[i]) and 0s elsewhere:
sample_maps_raw = table (seq (1, num_rows), sample_rec_ids);
max_rec_id = ncol (sample_maps_raw);
if (max_rec_id >= num_records) {
sample_maps = sample_maps_raw [, 1 : num_records];
} else {
sample_maps = matrix (0, rows = num_rows, cols = num_records);
sample_maps [, 1 : max_rec_id] = sample_maps_raw;
# Create a 0-1-matrix that maps each sample column ID into all row positions of the
# corresponding sample; map out-of-sample-range positions to row id = num_rows + 1:
sample_positions = (num_rows + 1) - is_row_in_samples * seq (num_rows, 1, -1);
# Column ID positions = 1, 1, ..., 1, 2, 2, ..., 2, . . . , n_c, n_c, ..., n_c:
col_positions = round (0.5 + seq (0, num_rows - 1, 1) / sample_block_size);
sample_col_map = table (sample_positions, col_positions);
# Remove the out-of-sample-range positions by cutting off the last row:
sample_col_map = sample_col_map [1 : (num_rows), ];
} else {
one_per_record = matrix (1, rows = num_records, cols = 1);
sample_block_size = num_records;
sample_maps = matrix (0, rows = (num_records * num_samples), cols = num_records);
sample_col_map = matrix (0, rows = (num_records * num_samples), cols = num_samples);
for (i in 1:num_samples) {
sample_maps [(num_records * (i - 1) + 1) : (num_records * i), ] = diag (one_per_record);
sample_col_map [(num_records * (i - 1) + 1) : (num_records * i), i] = one_per_record;
} } }