| #------------------------------------------------------------- |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| #------------------------------------------------------------- |
| |
| # Builtin function that implements the k-Means clustering algorithm |
| # |
| # INPUT PARAMETERS: |
| # ---------------------------------------------------------------------------- |
| # NAME TYPE DEFAULT MEANING |
| # ---------------------------------------------------------------------------- |
| # X Double --- The input Matrix to do KMeans on. |
| # k Int --- Number of centroids |
| # runs Int 10 Number of runs (with different initial centroids) |
| # max_iter Int 1000 Maximum number of iterations per run |
| # eps Double 0.000001 Tolerance (epsilon) for WCSS change ratio |
| # is_verbose Boolean FALSE do not print per-iteration stats |
| # avg_sample_size_per_centroid Int 50 Average number of records per centroid in data samples |
| # |
| # |
| # RETURN VALUES |
| # ---------------------------------------------------------------------------- |
| # NAME TYPE DEFAULT MEANING |
| # ---------------------------------------------------------------------------- |
| # Y String "Y.mtx" The mapping of records to centroids |
| # C String "C.mtx" The output matrix with the centroids |
| # ---------------------------------------------------------------------------- |
| |
| |
| m_kmeans = function(Matrix[Double] X, Integer k = 10, Integer runs = 10, Integer max_iter = 1000, |
| Double eps = 0.000001, Boolean is_verbose = FALSE, Integer avg_sample_size_per_centroid = 50) |
| return (Matrix[Double] C, Matrix[Double] Y) |
| { |
| print ("BEGIN K-MEANS SCRIPT"); |
| num_records = nrow (X); |
| num_features = ncol (X); |
| num_centroids = k; |
| num_runs = runs; |
| |
| if(is_verbose == TRUE) |
| print("dim X=" + nrow(X) + "x" + ncol(X)) |
| |
| sumXsq = sum (X ^ 2); |
| |
| # STEP 1: INITIALIZE CENTROIDS FOR ALL RUNS FROM DATA SAMPLES: |
| |
| print ("Taking data samples for initialization..."); |
| |
| [sample_maps, samples_vs_runs_map, sample_block_size] = get_sample_maps( |
| num_records, num_runs, num_centroids * avg_sample_size_per_centroid); |
| |
| is_row_in_samples = rowSums (sample_maps); |
| X_samples = sample_maps %*% X; |
| X_samples_sq_norms = rowSums (X_samples ^ 2); |
| |
| print ("Initializing the centroids for all runs..."); |
| All_Centroids = matrix (0, num_runs * num_centroids, num_features); |
| |
| # We select centroids according to the k-Means++ heuristic applied to a sample of X |
| # Loop invariant: min_distances ~ sq.distances from X_sample rows to nearest centroids, |
| # with the out-of-range X_sample positions in min_distances set to 0.0 |
| |
| min_distances = is_row_in_samples; # Pick the 1-st centroids uniformly at random |
| |
| for (i in 1 : num_centroids) |
| { |
| # "Matricize" and prefix-sum to compute the cumulative distribution function: |
| min_distances_matrix_form = matrix (min_distances, rows = sample_block_size, cols = num_runs, byrow = FALSE); |
| cdf_min_distances = cumsum (min_distances_matrix_form); |
| |
| # Select the i-th centroid in each sample as a random sample row id with |
| # probability ~ min_distances: |
| random_row = rand(rows = 1, cols = num_runs); |
| threshold_matrix = random_row * cdf_min_distances [sample_block_size, ]; |
| centroid_ids = t(colSums (cdf_min_distances < threshold_matrix)) + 1; |
| |
| # Place the selected centroids together, one per run, into a matrix: |
| centroid_placer = table (seq (1, num_runs), |
| sample_block_size * seq (0, num_runs - 1) + centroid_ids, num_runs, sample_block_size * num_runs); |
| centroids = centroid_placer %*% X_samples; |
| |
| # Place the selected centroids into their appropriate slots in All_Centroids: |
| centroid_placer = table (seq (i, num_centroids * (num_runs - 1) + i, num_centroids), |
| seq (1, num_runs, 1), nrow (All_Centroids), num_runs); |
| All_Centroids = All_Centroids + centroid_placer %*% centroids; |
| |
| # Update min_distances to preserve the loop invariant: |
| distances = X_samples_sq_norms + samples_vs_runs_map %*% rowSums (centroids ^ 2) |
| - 2 * rowSums (X_samples * (samples_vs_runs_map %*% centroids)); |
| min_distances = ifelse(i==1, is_row_in_samples*distances, min(min_distances,distances)); |
| } |
| |
| # STEP 2: PERFORM K-MEANS ITERATIONS FOR ALL RUNS: |
| |
| termination_code = matrix (0, rows = num_runs, cols = 1); |
| final_wcss = matrix (0, rows = num_runs, cols = 1); |
| num_iterations = matrix (0, rows = num_runs, cols = 1); |
| |
| print ("Performing k-means iterations for all runs..."); |
| |
| parfor (run_index in 1 : num_runs, check = 0) |
| { |
| C = All_Centroids [(num_centroids * (run_index - 1) + 1) : (num_centroids * run_index), ]; |
| C_old = C; |
| iter_count = 0; |
| term_code = 0; |
| wcss = Inf |
| |
| while (term_code == 0) |
| { |
| # Compute Euclidean squared distances from records (X rows) to centroids (C rows) |
| # without the C-independent term, then take the minimum for each record |
| D = -2 * (X %*% t(C)) + t(rowSums (C ^ 2)); |
| minD = rowMins (D); |
| # Compute the current centroid-based within-cluster sum of squares (WCSS) |
| wcss_old = wcss; |
| wcss = sumXsq + sum (minD); |
| if( is_verbose ) { |
| if (iter_count == 0) |
| print ("Run " + run_index + ", At Start-Up: Centroid WCSS = " + wcss); |
| else |
| print ("Run " + run_index + ", Iteration " + iter_count + ": Centroid WCSS = " + wcss |
| + "; Centroid change (avg.sq.dist.) = " + (sum ((C - C_old) ^ 2) / num_centroids)); |
| } |
| |
| # Find the closest centroid for each record |
| P = D <= minD; |
| # If some records belong to multiple centroids, share them equally |
| P = P / rowSums (P); |
| # Compute the column normalization factor for P |
| P_denom = colSums (P); |
| # Compute new centroids as weighted averages over the records |
| C_new = (t(P) %*% X) / t(P_denom); |
| |
| # Check if convergence or maximum iteration has been reached |
| iter_count = iter_count + 1 |
| if(wcss_old - wcss < eps * wcss) |
| term_code = 1; # Convergence reached |
| else if(iter_count >= max_iter) |
| term_code = 2; # Max iteration reached |
| else if(sum (P_denom <= 0) > 0) |
| term_code = 3; # "Runaway" centroid (0.0 denom) |
| else |
| C_old = C; C = C_new; |
| } |
| |
| if(is_verbose == TRUE) |
| print ("Run " + run_index + ", Iteration " + iter_count + ": Terminated with code = " |
| + term_code + ", Centroid WCSS = " + wcss); |
| |
| All_Centroids [(num_centroids * (run_index - 1) + 1) : (num_centroids * run_index), ] = C; |
| final_wcss [run_index, 1] = wcss; |
| termination_code [run_index, 1] = term_code; |
| num_iterations [run_index, 1] = iter_count; |
| } |
| |
| # STEP 3: SELECT THE RUN WITH BEST CENTROID-WCSS AND OUTPUT ITS CENTROIDS: |
| |
| termination_bitmap = matrix (0, num_runs, 3); |
| termination_bitmap_raw = table (seq (1, num_runs, 1), termination_code); |
| termination_bitmap [, 1 : ncol(termination_bitmap_raw)] = termination_bitmap_raw; |
| termination_stats = colSums (termination_bitmap); |
| |
| print ("Number of successful runs = " + as.integer (as.scalar (termination_stats [1, 1]))); |
| print ("Number of incomplete runs = " + as.integer (as.scalar (termination_stats [1, 2]))); |
| print ("Number of failed runs (with lost centroids) = " + as.integer (as.scalar (termination_stats [1, 3]))); |
| |
| num_successful_runs = as.scalar (termination_stats [1, 1]); |
| |
| if (num_successful_runs > 0) |
| { |
| final_wcss_successful = replace(target = final_wcss, pattern = Inf, replacement = 0)* termination_bitmap [, 1]; |
| worst_wcss = max (final_wcss_successful); |
| best_wcss = min (final_wcss_successful + (10 * worst_wcss + 10) * (1 - termination_bitmap [, 1])); |
| avg_wcss = sum (final_wcss_successful) / num_successful_runs; |
| best_index_vector = (final_wcss_successful == best_wcss); |
| aggr_best_index_vector = cumsum (best_index_vector); |
| best_index = as.integer (sum (aggr_best_index_vector == 0) + 1); |
| |
| print ("Successful runs: Best run is " + best_index + " with Centroid WCSS = " + best_wcss |
| + "; Avg WCSS = " + avg_wcss + "; Worst WCSS = " + worst_wcss); |
| |
| C = All_Centroids [(num_centroids * (best_index - 1) + 1) : (num_centroids * best_index), ]; |
| D = -2 * (X %*% t(C)) + t(rowSums (C ^ 2)); |
| P = (D <= rowMins (D)); |
| aggr_P = t(cumsum (t(P))); |
| Y = rowSums (aggr_P == 0) + 1 |
| |
| if(is_verbose == TRUE) |
| print("dim C=" + nrow(C) + "x" + ncol(C) + ", dim Y=" + nrow(Y) + "x" + ncol(Y)) |
| print ("DONE."); |
| } |
| else |
| stop ("No output is produced. Try increasing the number of iterations and/or runs."); |
| |
| } |
| |
| |
| get_sample_maps = function (int num_records, int num_samples, int approx_sample_size) |
| return (Matrix[double] sample_maps, Matrix[double] sample_col_map, int sample_block_size) |
| { |
| if (approx_sample_size < num_records) |
| { |
| # Input value "approx_sample_size" is the average sample size; increase it by ~10 std.dev's |
| # to get the sample block size (to allocate space): |
| sample_block_size = as.integer (approx_sample_size + round (10 * sqrt (approx_sample_size))); |
| num_rows = sample_block_size * num_samples; |
| |
| # Generate all samples in parallel by converting uniform random values into random |
| # integer skip-ahead intervals and prefix-summing them: |
| sample_rec_ids = Rand (rows = sample_block_size, cols = num_samples, min = 0.0, max = 1.0); |
| sample_rec_ids = round (log (sample_rec_ids) / log (1.0 - approx_sample_size / num_records) + 0.5); |
| # Prob [k-1 < log(uniform)/log(1-p) < k] = p*(1-p)^(k-1) = Prob [k-1 zeros before a one] |
| sample_rec_ids = cumsum (sample_rec_ids); # (skip to next one) --> (skip to i-th one) |
| |
| # Replace all sample record ids over "num_records" (i.e. out of range) by "num_records + 1": |
| is_sample_rec_id_within_range = (sample_rec_ids <= num_records); |
| sample_rec_ids = sample_rec_ids * is_sample_rec_id_within_range |
| + (num_records + 1) * (1 - is_sample_rec_id_within_range); |
| |
| # Rearrange all samples (and their out-of-range indicators) into one column-vector: |
| sample_rec_ids = matrix (sample_rec_ids, rows = num_rows, cols = 1, byrow = FALSE); |
| is_row_in_samples = matrix (is_sample_rec_id_within_range, rows = num_rows, cols = 1, byrow = FALSE); |
| |
| # Use contingency table to create the "sample_maps" matrix that is a vertical concatenation |
| # of 0-1-matrices, one per sample, each with 1s at (i, sample_record[i]) and 0s elsewhere: |
| sample_maps = table (seq (1, num_rows), sample_rec_ids, num_rows, num_records); |
| |
| # Create a 0-1-matrix that maps each sample column ID into all row positions of the |
| # corresponding sample; map out-of-sample-range positions to row id = num_rows + 1: |
| sample_positions = (num_rows + 1) - is_row_in_samples * seq (num_rows, 1, -1); |
| # Column ID positions = 1, 1, ..., 1, 2, 2, ..., 2, . . . , n_c, n_c, ..., n_c: |
| col_positions = round (0.5 + seq (0, num_rows - 1, 1) / sample_block_size); |
| sample_col_map = table (sample_positions, col_positions); |
| # Remove the out-of-sample-range positions by cutting off the last row: |
| sample_col_map = sample_col_map [1 : (num_rows), ]; |
| } |
| else { |
| one_per_record = matrix (1, num_records, 1); |
| sample_block_size = num_records; |
| sample_maps = matrix (0, (num_records * num_samples), num_records); |
| sample_col_map = matrix (0, (num_records * num_samples), num_samples); |
| for (i in 1:num_samples) { |
| sample_maps [(num_records * (i - 1) + 1) : (num_records * i), ] = diag (one_per_record); |
| sample_col_map [(num_records * (i - 1) + 1) : (num_records * i), i] = one_per_record; |
| } |
| } |
| } |