| # coding=utf-8 |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import plpy |
| |
| from utilities.control import MinWarning |
| from utilities.utilities import _assert |
| from utilities.utilities import unique_string |
| from utilities.utilities import add_postfix |
| from utilities.utilities import INTEGER, NUMERIC, ONLY_ARRAY |
| from utilities.utilities import is_valid_psql_type |
| from utilities.utilities import is_platform_pg |
| from utilities.utilities import num_features |
| from utilities.utilities import get_seg_number |
| from utilities.validate_args import input_tbl_valid, output_tbl_valid |
| from utilities.validate_args import is_var_valid |
| from utilities.validate_args import cols_in_tbl_valid |
| from utilities.validate_args import get_expr_type |
| from utilities.validate_args import get_algorithm_name |
| from graph.wcc import wcc |
| |
| from math import log |
| from math import ceil |
| from math import sqrt |
| from time import time |
| import numpy as np |
| from collections import deque |
| |
| import utilities.debug as DEBUG |
| DEBUG.plpy_info_enabled = False |
| DEBUG.plpy_execute_enabled = False |
| DEBUG.timings_enabled = False |
| |
| try: |
| from rtree import index |
| except ImportError: |
| RTREE_ENABLED=0 |
| else: |
| RTREE_ENABLED=1 |
| |
| METHOD_BRUTE_FORCE = 'brute_force' |
| METHOD_OPTIMIZED = 'optimized' |
| DEFAULT_MIN_SAMPLES = 5 |
| DEFAULT_METRIC = 'squared_dist_norm2' |
| |
| def dbscan(schema_madlib, source_table, output_table, id_column, expr_point, |
| eps, min_samples, metric, algorithm, max_segmentation_depth, **kwargs): |
| |
| with MinWarning("warning"): |
| # algorithm=None is handled in get_algorithm_name() |
| min_samples = DEFAULT_MIN_SAMPLES if not min_samples else min_samples |
| metric = DEFAULT_METRIC if not metric else metric |
| num_segs = get_seg_number() |
| |
| algorithm = get_algorithm_name(algorithm, METHOD_OPTIMIZED, |
| [METHOD_BRUTE_FORCE, METHOD_OPTIMIZED], 'DBSCAN') |
| |
| if max_segmentation_depth is None: |
| # Default to num_segs |
| max_depth = num_segs |
| else: |
| max_depth = max_segmentation_depth |
| if algorithm != METHOD_OPTIMIZED: |
| plpy.warning("Ignoring max_segmentation_depth={} param, " |
| "N/A to algorithm={}".format(max_depth, algorithm)) |
| |
| _validate_dbscan(schema_madlib, source_table, output_table, id_column, |
| expr_point, eps, min_samples, metric, algorithm, max_depth) |
| |
| dist_by_src = '' if is_platform_pg() else 'DISTRIBUTED BY (__src__)' |
| dist_by_id = '' if is_platform_pg() else 'DISTRIBUTED BY (id)' |
| dist_by_reach_id = '' if is_platform_pg() else 'DISTRIBUTED BY (__reachable_id__)' |
| dist_by_seg_id = '' if is_platform_pg() else 'DISTRIBUTED BY (seg_id)' |
| distributed_by = '' if is_platform_pg() else 'DISTRIBUTED BY (__dist_id__)' |
| |
| core_points_table = unique_string(desp='core_points_table') |
| core_edge_table = unique_string(desp='core_edge_table') |
| distance_table = unique_string(desp='distance_table') |
| |
| source_view = unique_string(desp='source_view') |
| plpy.execute("DROP VIEW IF EXISTS {0}".format(source_view)) |
| sql = """ |
| CREATE VIEW {source_view} AS |
| SELECT ({id_column})::BIGINT AS id, ({expr_point})::DOUBLE PRECISION[] AS point |
| FROM {source_table} |
| """.format(**locals()) |
| plpy.execute(sql) |
| |
| if algorithm == METHOD_OPTIMIZED: |
| dbscan_opt = DBSCAN_optimized(schema_madlib, source_view, output_table, |
| eps, min_samples, metric, max_depth) |
| dbscan_opt.run() |
| else: |
| # Calculate pairwise distances |
| sql = """ |
| CREATE TEMP TABLE {distance_table} AS |
| SELECT __src__, __dest__ FROM ( |
| SELECT __t1__.id AS __src__, |
| __t2__.id AS __dest__, |
| {schema_madlib}.{metric}( |
| __t1__.point, __t2__.point) AS __dist__ |
| FROM {source_view} AS __t1__, {source_view} AS __t2__)q1 |
| WHERE __dist__ < {eps} |
| {dist_by_src} |
| """.format(**locals()) |
| plpy.execute(sql) |
| |
| # Find core points |
| sql = """ |
| CREATE TEMP TABLE {core_points_table} AS |
| SELECT * FROM ( |
| SELECT __src__ AS id, count(*) AS __count__ |
| FROM {distance_table} GROUP BY __src__) q1 |
| WHERE __count__ >= {min_samples} |
| {dist_by_id} |
| """.format(**locals()) |
| plpy.execute(sql) |
| |
| # Find the connections between core points to form the clusters |
| sql = """ |
| CREATE TEMP TABLE {core_edge_table} AS |
| SELECT __src__, __dest__ |
| FROM {distance_table} AS __t1__, (SELECT array_agg(id) AS arr |
| FROM {core_points_table}) __t2__ |
| WHERE __t1__.__src__ = ANY(arr) AND __t1__.__dest__ = ANY(arr) |
| {dist_by_src} |
| """.format(**locals()) |
| plpy.execute(sql) |
| |
| sql = """ |
| SELECT count(*) FROM {core_points_table} |
| """.format(**locals()) |
| core_count = plpy.execute(sql)[0]['count'] |
| _assert(core_count != 0, "DBSCAN: Cannot find any core points/clusters.") |
| |
| # Start snowflake creation |
| if is_platform_pg(): |
| sql = """ |
| SELECT count(*) FROM {core_edge_table} |
| """.format(**locals()) |
| count = plpy.execute(sql)[0]['count'] |
| |
| counts_list = [int(count)] |
| seg_id = 0 |
| group_by_clause = '' |
| |
| else: |
| sql = """ |
| SELECT count(*), gp_segment_id FROM {core_edge_table} GROUP BY gp_segment_id |
| """.format(**locals()) |
| count_result = plpy.execute(sql) |
| seg_num = get_seg_number() |
| counts_list = [0]*seg_num |
| for i in count_result: |
| counts_list[int(i['gp_segment_id'])] = int(i['count']) |
| seg_id = 'gp_segment_id' |
| group_by_clause = 'GROUP BY gp_segment_id' |
| |
| snowflake_table = unique_string(desp='snowflake_table') |
| sf_edge_table = unique_string(desp='sf_edge_table') |
| |
| plpy.execute("DROP TABLE IF EXISTS {0}, {1}".format( |
| snowflake_table, sf_edge_table)) |
| sql = """ |
| CREATE TEMP TABLE {snowflake_table} AS |
| SELECT {seg_id}::INTEGER AS seg_id, |
| {schema_madlib}.build_snowflake_table( __src__, |
| __dest__, |
| ARRAY{counts_list}, |
| {seg_id} |
| ) AS __sf__ |
| FROM {core_edge_table} {group_by_clause} |
| {dist_by_seg_id} |
| """.format(**locals()) |
| plpy.execute(sql) |
| |
| sql = """ |
| CREATE TEMP TABLE {sf_edge_table} AS |
| SELECT seg_id, (edge).src AS __src__, (edge).dest AS __dest__ |
| FROM ( |
| SELECT seg_id, |
| {schema_madlib}.__dbscan_unpack_edge(__sf__) AS edge |
| FROM {snowflake_table} |
| ) q1 |
| {dist_by_seg_id} |
| """.format(**locals()) |
| plpy.execute(sql) |
| |
| # Run wcc to get the min id for each cluster |
| wcc(schema_madlib, core_points_table, 'id', sf_edge_table, |
| 'src=__src__, dest=__dest__', output_table, None) |
| |
| plpy.execute(""" |
| ALTER TABLE {0} |
| ADD COLUMN is_core_point BOOLEAN, |
| ADD COLUMN point DOUBLE PRECISION[]; |
| """.format(output_table) |
| ) |
| plpy.execute(""" |
| ALTER TABLE {0} |
| RENAME COLUMN component_id TO cluster_id; |
| """.format(output_table) |
| ) |
| plpy.execute(""" |
| UPDATE {0} |
| SET is_core_point = TRUE |
| """.format(output_table) |
| ) |
| |
| # Find reachable points |
| reachable_points_table = unique_string(desp='reachable_points_table') |
| plpy.execute("DROP TABLE IF EXISTS {0}".format(reachable_points_table)) |
| sql = """ |
| CREATE TEMP TABLE {reachable_points_table} AS |
| SELECT array_agg(__src__) AS __src_list__, |
| __dest__ AS __reachable_id__ |
| FROM {distance_table} AS __t1__, |
| (SELECT array_agg(id) AS __arr__ |
| FROM {core_points_table}) __t2__ |
| WHERE __src__ = ANY(__arr__) AND __dest__ != ALL(__arr__) |
| GROUP BY __dest__ |
| {dist_by_reach_id} |
| """.format(**locals()) |
| plpy.execute(sql) |
| |
| sql = """ |
| INSERT INTO {output_table} |
| SELECT __reachable_id__, |
| cluster_id, |
| FALSE AS is_core_point, |
| NULL AS point |
| FROM {reachable_points_table} AS __t1__ JOIN |
| {output_table} AS __t2__ |
| ON (__src_list__[1] = id) |
| """.format(**locals()) |
| plpy.execute(sql) |
| |
| # Add features of points to the output table to use them for prediction |
| sql = """ |
| UPDATE {output_table} AS __t1__ |
| SET point = __t2__.point |
| FROM {source_view} AS __t2__ |
| WHERE __t1__.id = __t2__.id |
| """.format(**locals()) |
| plpy.execute(sql) |
| |
| plpy.execute("DROP TABLE IF EXISTS {0}, {1}, {2}, {3}, {4}, {5}".format( |
| distance_table, core_points_table, core_edge_table, |
| reachable_points_table, snowflake_table, sf_edge_table)) |
| |
| plpy.execute("DROP VIEW IF EXISTS {0}".format(source_view)) |
| |
| # -- Update the cluster ids to be consecutive -- |
| sql = """ |
| UPDATE {output_table} AS __t1__ |
| SET cluster_id = new_id-1 |
| FROM ( |
| SELECT cluster_id, row_number() OVER(ORDER BY cluster_id) AS new_id |
| FROM {output_table} |
| GROUP BY cluster_id) __t2__ |
| WHERE __t1__.cluster_id = __t2__.cluster_id |
| """.format(**locals()) |
| plpy.execute(sql) |
| |
| # -- Generate output summary table --- |
| |
| output_summary_table = add_postfix(output_table, '_summary') |
| |
| plpy.execute("DROP TABLE IF EXISTS {0}".format(output_summary_table)) |
| sql = """ |
| CREATE TABLE {output_summary_table} AS |
| SELECT $${id_column}$$::VARCHAR AS id_column, |
| $${expr_point}$$::VARCHAR AS expr_point, |
| {eps}::DOUBLE PRECISION AS eps, |
| {min_samples}::BIGINT AS min_points, |
| '{metric}'::VARCHAR AS metric |
| """.format(**locals()) |
| plpy.execute(sql) |
| |
| class DBSCAN(object): |
| def __init__(self, schema_madlib, source_table, output_table, |
| eps, min_samples, metric): |
| """ |
| Args: |
| @param schema_madlib Name of the Madlib Schema |
| @param source_table Training data table |
| @param output_table Output table of points labelled by cluster_id |
| @param eps The eps value defined by the user |
| @param min_samples min_samples defined by user |
| @param metric metric selected by user |
| |
| """ |
| self.schema_madlib = schema_madlib |
| self.source_table = source_table |
| self.output_table = output_table |
| self.n_dims = num_features(source_table, 'point') |
| self.distributed_by = '' if is_platform_pg() else 'DISTRIBUTED BY (__dist_id__)' |
| self.min_samples = min_samples |
| self.metric = metric |
| # If squared_dist_norm2 is used, we assume eps is set for the squared |
| # distance. That means the border width must be sqrt(eps) |
| self.eps = sqrt(eps) if metric == DEFAULT_METRIC else eps |
| |
| class DBSCAN_optimized(DBSCAN): |
| def __init__(self, schema_madlib, source_table, output_table, |
| eps, min_samples, metric, max_depth=None): |
| """ |
| Args: |
| @param schema_madlib Name of the Madlib Schema |
| @param source_table Training data table |
| @param output_table Output table of points labelled by cluster_id |
| @param eps The eps value defined by the user |
| @param min_samples min_samples defined by user |
| @param metric metric selected by user |
| """ |
| |
| DBSCAN.__init__(self, schema_madlib, source_table, output_table, |
| eps, min_samples, metric) |
| self.max_depth = max_depth |
| self.unique_str = unique_string('DBSCAN_opt') |
| self.num_segs = get_seg_number() |
| res = plpy.execute("SELECT COUNT(*) FROM {source_table}".format(**locals())) |
| self.num_points = res[0]['count'] |
| |
| def run(self): |
| unique_str = self.unique_str |
| distributed_by = self.distributed_by |
| schema_madlib = self.schema_madlib |
| |
| self.dist_map = 'dist_map' + unique_str |
| |
| segmented_source = 'segmented_source' + unique_str |
| self.segmented_source = segmented_source |
| self.build_optimized_tree() |
| |
| rowcount_table = 'rowcounts' + unique_str |
| leaf_clusters_table = 'leaf_clusters' + unique_str |
| |
| count_rows_sql = """ |
| DROP TABLE IF EXISTS {rowcount_table}; |
| CREATE TEMP TABLE {rowcount_table} AS |
| WITH i AS ( |
| SELECT |
| count(*) AS num_rows, |
| dist_id, |
| __dist_id__ |
| FROM internal_{segmented_source} |
| GROUP BY dist_id, __dist_id__ |
| ), e AS ( |
| SELECT |
| count(*) AS num_rows, |
| dist_id, |
| __dist_id__ |
| FROM external_{segmented_source} |
| GROUP BY dist_id, __dist_id__ |
| ) |
| SELECT |
| i.num_rows AS num_internal_rows, |
| COALESCE(e.num_rows,0) AS num_external_rows, |
| i.dist_id, |
| __dist_id__ |
| FROM i LEFT JOIN e |
| USING (dist_id, __dist_id__) |
| {distributed_by} |
| """.format(**locals()) |
| plpy.execute(count_rows_sql) |
| |
| # Run dbscan in parallel on each leaf |
| dbscan_leaves_sql = """ |
| CREATE TEMP TABLE {leaf_clusters_table} AS |
| WITH input AS ( |
| SELECT * |
| FROM internal_{segmented_source} |
| UNION ALL |
| SELECT * |
| FROM external_{segmented_source} |
| ), segmented_source AS ( |
| SELECT |
| ROW( |
| id, |
| point, |
| FALSE, |
| 0, |
| leaf_id, |
| dist_id |
| )::{schema_madlib}.__dbscan_record AS rec, |
| __dist_id__ |
| FROM input |
| ) |
| SELECT (output).*, __dist_id__ FROM ( |
| SELECT |
| {schema_madlib}.__dbscan_leaf( |
| rec, |
| {self.eps}, |
| {self.min_samples}, |
| '{self.metric}', |
| num_internal_rows, |
| num_external_rows |
| ) AS output, |
| __dist_id__ |
| FROM segmented_source JOIN {rowcount_table} |
| USING (__dist_id__) |
| ) a {distributed_by} |
| """.format(**locals()) |
| DEBUG.plpy_execute(dbscan_leaves_sql, report_segment_tracebacks=True) |
| |
| plpy.execute(""" |
| DROP TABLE IF EXISTS |
| internal_{segmented_source}, |
| external_{segmented_source} |
| """.format(**locals())) |
| |
| cluster_map = 'cluster_map' + unique_str |
| noise_point = label.NOISE_POINT |
| |
| # Replace local cluster_id's in each leaf with |
| # global cluster_id's which are unique across all leaves |
| gen_cluster_map_sql = """ |
| CREATE TEMP TABLE {cluster_map} AS |
| SELECT |
| dist_id, |
| __dist_id__, |
| cluster_id AS local_id, |
| ROW_NUMBER() OVER( |
| ORDER BY dist_id, cluster_id |
| ) AS global_id |
| FROM {leaf_clusters_table} |
| WHERE dist_id = leaf_id AND cluster_id != {noise_point} |
| GROUP BY __dist_id__, dist_id, cluster_id |
| {distributed_by} |
| """.format(**locals()) |
| plpy.execute(gen_cluster_map_sql) |
| |
| globalize_cluster_ids_sql = """ |
| UPDATE {leaf_clusters_table} lc |
| SET cluster_id = global_id |
| FROM {cluster_map} cm WHERE |
| cm.dist_id = lc.dist_id AND |
| cluster_id = local_id; |
| """.format(**locals()) |
| DEBUG.plpy.execute(globalize_cluster_ids_sql) |
| |
| intercluster_edges = 'intercluster_edges' + unique_str |
| dist_by_src = '' if is_platform_pg() else 'DISTRIBUTED BY (src)' |
| |
| # Build intercluster table of edges connecting clusters |
| # on different leaves |
| intercluster_edge_sql = """ |
| CREATE TEMP TABLE {intercluster_edges} AS |
| SELECT |
| internal.cluster_id AS src, |
| external.cluster_id AS dest |
| FROM {leaf_clusters_table} internal |
| JOIN {leaf_clusters_table} external |
| USING (id) |
| WHERE internal.is_core_point |
| AND internal.leaf_id = internal.dist_id |
| AND external.leaf_id != external.dist_id |
| GROUP BY src,dest |
| {dist_by_src} |
| """.format(**locals()) |
| DEBUG.plpy.execute(intercluster_edge_sql) |
| |
| res = plpy.execute("SELECT count(*) FROM {intercluster_edges}".format(**locals())) |
| |
| if int(res[0]['count']) > 0: |
| wcc_table = 'wcc' + unique_str |
| plpy.execute(""" |
| DROP TABLE IF EXISTS |
| {wcc_table}, {wcc_table}_summary, |
| {self.output_table} |
| """.format(**locals())) |
| |
| # Find connected components of intercluster_edge_table |
| wcc(schema_madlib, cluster_map, 'global_id', intercluster_edges, |
| 'src=src,dest=dest', wcc_table, None) |
| |
| # Rename each cluster_id to its component id in the intercluster graph |
| merge_clusters_sql = """ |
| UPDATE {leaf_clusters_table} lc |
| SET cluster_id = wcc.component_id |
| FROM {wcc_table} wcc |
| WHERE lc.cluster_id = wcc.global_id |
| """.format(**locals()) |
| plpy.execute(merge_clusters_sql) |
| |
| plpy.execute(""" |
| DROP TABLE IF EXISTS |
| {wcc_table}, {wcc_table}_summary |
| """.format(**locals())) |
| |
| add_crossborder_points_sql = """ |
| CREATE TABLE {self.output_table} AS |
| SELECT |
| id, point, cluster_id, is_core_point |
| FROM ( |
| SELECT |
| id, |
| point, |
| is_core_point, |
| leaf_id = dist_id AS is_internal, |
| dist_id, |
| CASE WHEN cluster_id = {noise_point} |
| THEN MAX(cluster_id) OVER(PARTITION BY id) |
| ELSE cluster_id |
| END |
| FROM {leaf_clusters_table} |
| ) x WHERE is_internal AND cluster_id != {noise_point} |
| """.format(**locals()) |
| DEBUG.plpy.execute(add_crossborder_points_sql) |
| |
| plpy.execute(""" |
| DROP TABLE IF EXISTS {rowcount_table}, |
| {leaf_clusters_table}, {cluster_map}, |
| {intercluster_edges} |
| """.format(**locals())) |
| |
| def build_optimized_tree(self): |
| eps = self.eps |
| num_segs = self.num_segs |
| num_points = self.num_points |
| target = num_points / num_segs |
| source_table = self.source_table |
| leaf_bounds_table = 'leaf_bounds' + self.unique_str |
| max_depth = self.max_depth |
| n_dims = self.n_dims |
| dist_map = self.dist_map |
| distributed_by = self.distributed_by |
| internal_points = 'internal_' + self.segmented_source |
| external_points = 'external_' + self.segmented_source |
| |
| next_cut = 'next_cut' + self.unique_str |
| prev_node = 'prev_node' + self.unique_str |
| |
| first_cut_sql = """ |
| DROP TABLE IF EXISTS {prev_node}; |
| CREATE TEMP TABLE {prev_node} AS |
| WITH world_bounds AS ( |
| SELECT |
| i, |
| min(point[i]) AS lower, |
| max(point[i])+{eps} AS upper, |
| max(point[i])+{eps} - min(point[i]) AS size, |
| floor((max(point[i])+{eps} - min(point[i])) / {eps})::INT AS eps_bins |
| FROM generate_series(1,{n_dims}) AS i, {source_table} |
| GROUP BY i |
| ), |
| density_map AS ( |
| SELECT i, eps_bin, |
| eps_bins, |
| COALESCE(density, 0) AS density |
| FROM ( |
| SELECT i, eps_bins, |
| generate_series(0, eps_bins - 1) AS eps_bin |
| FROM world_bounds |
| ) g LEFT JOIN ( |
| SELECT i, |
| floor((point[i] - lower)/{eps})::INT AS eps_bin, |
| eps_bins, |
| COUNT(*) AS density |
| FROM |
| world_bounds, |
| {source_table} |
| GROUP BY i, eps_bin, eps_bins |
| ) a |
| USING (i, eps_bin, eps_bins) |
| ), |
| loss_table AS ( |
| SELECT i, eps_bin, |
| left_internal, |
| {num_points} as points_in_node, |
| left_external, |
| right_external, |
| {self.schema_madlib}.__dbscan_segmentation_loss( |
| left_internal, |
| {num_points} - left_internal, |
| left_external, |
| right_external, |
| {num_segs}::BIGINT, |
| V, |
| {eps}::DOUBLE PRECISION, |
| {n_dims}::BIGINT, |
| eps_bin::DOUBLE PRECISION, |
| eps_bins::DOUBLE PRECISION |
| ) AS losses |
| FROM ( |
| SELECT i, eps_bin, eps_bins, |
| {num_segs}::BIGINT AS num_segs, |
| COALESCE(density, 0) AS right_external, |
| COALESCE(LEAD(density) OVER(PARTITION BY i ORDER BY eps_bin), 0) AS left_external, |
| SUM(density) OVER(PARTITION BY i ORDER BY eps_bin)::BIGINT AS left_internal |
| FROM ( |
| SELECT i, generate_series(0, eps_bins - 1) AS eps_bin FROM world_bounds |
| ) g LEFT JOIN density_map USING(i, eps_bin) |
| ) params, |
| ( |
| SELECT {self.schema_madlib}.prod(size) AS V |
| FROM world_bounds |
| ) AS volume |
| ), |
| optimize AS ( |
| SELECT i, lower, cutoff, upper, |
| left_avail, right_avail, |
| left_internal, right_internal |
| FROM ( |
| SELECT |
| i, |
| CASE WHEN (losses).right_avail = 0 |
| THEN |
| points_in_node |
| ELSE |
| left_internal |
| END AS left_internal, |
| CASE WHEN (losses).right_avail = 0 |
| THEN |
| 0::BIGINT |
| ELSE |
| points_in_node - left_internal |
| END AS right_internal, |
| (losses).left_avail AS left_avail, |
| (losses).right_avail AS right_avail, |
| GREATEST((losses).left_loss,(losses).right_loss) AS total_loss, |
| min( |
| GREATEST((losses).left_loss,(losses).right_loss) |
| ) OVER() AS total_min, |
| wb.lower, |
| CASE WHEN (losses).right_avail = 0 |
| THEN |
| wb.upper |
| ELSE |
| wb.lower + {eps}*(eps_bin+1) |
| END AS cutoff, |
| wb.upper |
| FROM loss_table JOIN world_bounds wb USING(i) |
| ) a WHERE total_loss = total_min LIMIT 1 |
| ) |
| SELECT s.id, |
| ARRAY[i] AS coords, |
| CASE WHEN s.point[i] < cutoff |
| THEN left_avail |
| ELSE right_avail |
| END AS avail_segs, |
| CASE WHEN point[i] < cutoff |
| THEN left_internal |
| ELSE right_internal |
| END AS points_in_node, |
| CASE WHEN s.point[i] < cutoff |
| THEN ARRAY[lower] |
| ELSE ARRAY[cutoff] |
| END AS lower_bounds, |
| CASE WHEN s.point[i] < cutoff |
| THEN ARRAY[cutoff] |
| ELSE ARRAY[upper] |
| END AS upper_bounds, |
| (s.point[i] >= cutoff)::INT AS node_id |
| FROM optimize, {source_table} s |
| """.format(**locals()) |
| plpy.execute(first_cut_sql) |
| |
| segment_source_sql = """ |
| CREATE TEMP TABLE {next_cut} AS |
| WITH node_bounds AS ( |
| SELECT |
| i, |
| node_id, |
| avail_segs, |
| points_in_node, |
| min(point[i]) AS lower, |
| max(point[i])+{eps} AS upper, |
| max(point[i])+{eps} - min(point[i]) AS size, |
| floor((max(point[i])+{eps} - min(point[i])) / {eps})::INT AS eps_bins |
| FROM generate_series(1, {n_dims}) AS i, |
| {prev_node} JOIN {source_table} USING (id) |
| GROUP BY 1,2,3,4 |
| ), density_map AS ( |
| SELECT i, node_id, eps_bin, |
| COALESCE( density, 0) AS density |
| FROM ( |
| SELECT node_id, i, |
| generate_series(0, eps_bins - 1) AS eps_bin |
| FROM node_bounds |
| ) g LEFT JOIN ( |
| SELECT node_id, i, |
| floor((point[i] - lower)/{eps})::INT AS eps_bin, |
| COUNT(*) AS density |
| FROM |
| node_bounds JOIN {prev_node} USING (node_id) |
| JOIN {source_table} USING (id) |
| GROUP BY node_id, i, eps_bin |
| ) a |
| USING(i, node_id, eps_bin) |
| ), params AS ( |
| SELECT i, node_id, eps_bin, eps_bins, |
| COALESCE(density, 0) AS right_external, |
| COALESCE(LEAD(density) OVER(PARTITION BY node_id,i ORDER BY eps_bin), 0) AS left_external, |
| SUM(density) OVER(PARTITION BY node_id,i ORDER BY eps_bin)::BIGINT AS left_internal |
| FROM ( |
| SELECT i, node_id, eps_bins, generate_series(0, eps_bins - 1) AS eps_bin FROM node_bounds |
| ) g LEFT JOIN density_map USING(i, node_id, eps_bin) |
| ), volume AS ( |
| SELECT |
| node_id, |
| avail_segs, |
| points_in_node, |
| {self.schema_madlib}.prod(size) AS V |
| FROM node_bounds |
| WHERE i=1 |
| GROUP BY 1,2,3 |
| ), loss_table AS ( |
| SELECT i, node_id, eps_bin, |
| left_internal, |
| points_in_node - left_internal AS right_internal, |
| left_external, |
| right_external, |
| {self.schema_madlib}.__dbscan_segmentation_loss( |
| left_internal, |
| points_in_node, |
| left_external, |
| right_external, |
| avail_segs, |
| V, |
| {eps}::DOUBLE PRECISION, |
| {n_dims}::BIGINT, |
| eps_bin::DOUBLE PRECISION, |
| eps_bins::DOUBLE PRECISION |
| ) AS losses |
| FROM params JOIN volume USING (node_id) |
| ), optimize AS ( |
| SELECT * FROM ( |
| SELECT |
| node_id, |
| i, |
| (losses).left_avail, |
| (losses).right_avail, |
| CASE WHEN (losses).right_avail = 0 |
| THEN |
| points_in_node |
| ELSE |
| left_internal |
| END AS left_internal, |
| CASE WHEN (losses).right_avail = 0 |
| THEN |
| 0::BIGINT |
| ELSE |
| points_in_node - left_internal |
| END AS right_internal, |
| node_bounds.lower, |
| CASE WHEN (losses).right_avail = 0 |
| THEN |
| node_bounds.upper |
| ELSE |
| node_bounds.lower + {eps}*(eps_bin+1) |
| END AS cutoff, |
| node_bounds.upper, |
| ROW_NUMBER() OVER( |
| PARTITION BY node_id |
| ORDER BY GREATEST((losses).left_loss,(losses).right_loss) |
| ) AS loss_rank |
| FROM loss_table JOIN node_bounds USING(i, node_id) |
| ) a WHERE loss_rank = 1 |
| ) |
| SELECT id, coords || i AS coords, |
| CASE WHEN point[i] < opt.cutoff |
| THEN left_avail |
| ELSE right_avail |
| END AS avail_segs, |
| CASE WHEN point[i] < opt.cutoff |
| THEN left_internal |
| ELSE right_internal |
| END AS points_in_node, |
| lower_bounds || |
| CASE WHEN point[i] < opt.cutoff |
| THEN |
| lower |
| ELSE |
| opt.cutoff |
| END AS lower_bounds, |
| opt.cutoff, |
| upper_bounds || |
| CASE WHEN point[i] < opt.cutoff |
| THEN |
| opt.cutoff |
| ELSE |
| upper |
| END AS upper_bounds, |
| (node_id << 1) | (point[i] >= opt.cutoff)::INT AS node_id |
| FROM {source_table} |
| JOIN {prev_node} |
| USING (id) |
| JOIN optimize opt |
| USING (node_id) |
| """.format(**locals()) |
| |
| for i in range(max_depth): |
| DEBUG.plpy.info(segment_source_sql) |
| plpy.execute(segment_source_sql) |
| plpy.execute(""" |
| DROP TABLE IF EXISTS {prev_node}; |
| ALTER TABLE {next_cut} RENAME TO {prev_node} |
| """.format(**locals())) |
| |
| if is_platform_pg(): |
| # For postgres, there should only be 1 node, |
| # so we assign it dist_id = __dist_id__ = 0 |
| plpy.execute(""" |
| CREATE TABLE {dist_map} AS |
| SELECT 0::BIGINT AS dist_id, 0::BIGINT as __dist_id__ |
| """.format(**locals())) |
| else: |
| # Create implicit many-to-1 map from __dist_id__ to seg_id |
| temp_dist_table = 'dist_table' + self.unique_str |
| create_dist_table_sql = """ |
| CREATE TEMP TABLE {temp_dist_table} AS |
| SELECT generate_series(1,10000) AS __dist_id__ |
| {distributed_by} |
| """.format(**locals()) |
| plpy.execute(create_dist_table_sql) |
| |
| # Create a 1-1 map from dist_id to __dist_id__, |
| # where dist_id's are re-ordered so they are evenly |
| # distributed among segments. This should work even |
| # if there are gaps in the dist_id assignments (some |
| # dist_id's missing) |
| leaves_sql = """ |
| SELECT |
| node_id AS dist_id, |
| ROW_NUMBER() OVER() AS leaf_num |
| FROM {prev_node} |
| GROUP BY node_id |
| """.format(**locals()) |
| leaves_cnt = plpy.execute(""" |
| WITH leaves as ({leaves_sql}) |
| SELECT COUNT(*) cnt from leaves |
| """.format(**locals()))[0]['cnt'] |
| create_dist_map_sql = """ |
| CREATE TEMP TABLE {dist_map} AS |
| WITH leaves as ( |
| {leaves_sql} |
| ), segmap AS ( |
| SELECT |
| gp_segment_id AS seg_id, |
| ROW_NUMBER() OVER( |
| ORDER BY ctid, gp_segment_id |
| ) AS leaf_num, |
| __dist_id__ |
| FROM {temp_dist_table} |
| LIMIT {leaves_cnt} |
| ) |
| SELECT dist_id, __dist_id__ |
| FROM leaves JOIN segmap |
| USING (leaf_num) |
| {distributed_by} |
| """.format(**locals()) |
| plpy.execute(create_dist_map_sql) |
| |
| plpy.execute(""" |
| DROP TABLE IF EXISTS {temp_dist_table} |
| """.format(**locals())) |
| |
| gen_internal_points_sql = """ |
| CREATE TEMP TABLE {internal_points} AS |
| SELECT id, point, node_id AS leaf_id, node_id AS dist_id, __dist_id__ |
| FROM {source_table} |
| JOIN {prev_node} |
| USING (id) |
| JOIN {dist_map} |
| ON node_id = dist_id |
| {distributed_by} |
| """.format(**locals()) |
| DEBUG.plpy.execute(gen_internal_points_sql) |
| |
| plpy.execute(""" |
| DROP TABLE IF EXISTS {prev_node} |
| """.format(**locals())) |
| |
| generate_leaf_bounds_sql = """ |
| CREATE TEMP TABLE {leaf_bounds_table} AS |
| SELECT |
| array_agg(min ORDER BY i) AS leaf_lower, |
| array_agg(max ORDER BY i) AS leaf_upper, |
| array_agg(min - {eps} ORDER BY i) AS dist_lower, |
| array_agg(max + {eps} ORDER BY i) AS dist_upper, |
| leaf_id, |
| __dist_id__ |
| FROM ( |
| SELECT |
| leaf_id, |
| i, |
| min(point[i]), |
| max(point[i]), |
| count(*) |
| FROM {internal_points}, generate_series(1, {n_dims}) AS i |
| GROUP BY leaf_id, i |
| ) x |
| JOIN {dist_map} |
| ON leaf_id = dist_id |
| GROUP BY leaf_id, __dist_id__ |
| {distributed_by} |
| """.format(**locals()) |
| DEBUG.plpy.execute(generate_leaf_bounds_sql) |
| |
| gen_external_points_sql = """ |
| CREATE TEMP TABLE {external_points} AS |
| SELECT i.id, i.point, i.leaf_id, b.leaf_id AS dist_id, b.__dist_id__ |
| FROM {leaf_bounds_table} b JOIN {internal_points} i |
| ON b.leaf_id != i.leaf_id |
| WHERE 0 <= all({self.schema_madlib}.array_sub(point, dist_lower)) AND |
| 0 < all({self.schema_madlib}.array_sub(dist_upper, point)) |
| {distributed_by} |
| """.format(**locals()) |
| plpy.execute(gen_external_points_sql) |
| |
| plpy.execute(""" |
| DROP TABLE IF EXISTS |
| {leaf_bounds_table}, {dist_map} |
| """.format(**locals())) |
| |
| def dbscan_predict(schema_madlib, dbscan_table, source_table, id_column, |
| expr_point, output_table, **kwargs): |
| |
| with MinWarning("warning"): |
| |
| _validate_dbscan_predict(schema_madlib, dbscan_table, source_table, id_column, |
| expr_point, output_table) |
| |
| dbscan_summary_table = add_postfix(dbscan_table, '_summary') |
| summary = plpy.execute("SELECT * FROM {0}".format(dbscan_summary_table))[0] |
| |
| eps = summary['eps'] |
| metric = summary['metric'] |
| sql = """ |
| CREATE TABLE {output_table} AS |
| WITH src AS ( |
| SELECT ({id_column}) AS id, ({expr_point}) AS point FROM {source_table} |
| ) |
| SELECT __q1__.id, cluster_id, distance |
| FROM ( |
| SELECT __t2__.id, cluster_id, |
| min({schema_madlib}.{metric}(__t1__.point, |
| __t2__.point)) as distance |
| FROM {dbscan_table} AS __t1__, src AS __t2__ |
| WHERE is_core_point = TRUE |
| GROUP BY __t2__.id, cluster_id |
| ) __q1__ WHERE distance <= {eps} |
| """.format(**locals()) |
| result = plpy.execute(sql) |
| |
| class label: |
| UNLABELLED = 0 |
| NOISE_POINT = -1 |
| |
| pstats = None |
| class dbscan_perfstats: |
| def __init__(self): |
| self.range_query_calls = 0 |
| self.range_query_time = 0.0 |
| self.range_query_candidates = 0 |
| self.range_query_neighbors = 0 |
| |
| def show(self, msg): |
| DEBUG.plpy.info("{}_stats: range_query total calls = {}" |
| .format(msg, self.range_query_calls)) |
| |
| range_query_calls = self.range_query_calls if self.range_query_calls else 0.00001 |
| |
| DEBUG.plpy.info("{}_stats: range_query total time = {}s" |
| .format(msg, self.range_query_time)) |
| DEBUG.plpy.info("{}_stats: range_query total candidates = {}" |
| .format(msg, self.range_query_candidates)) |
| DEBUG.plpy.info("{}_stats: range_query total neighbors returned = {}" |
| .format(msg, self.range_query_neighbors)) |
| DEBUG.plpy.info("{}_stats: range_query avg time = {}s" |
| .format(msg, self.range_query_time / range_query_calls)) |
| DEBUG.plpy.info("{}_stats: range_query avg candidates returned = {}" |
| .format(msg, self.range_query_candidates * 1.0 / range_query_calls)) |
| DEBUG.plpy.info("{}_stats: range_query avg neighbors returned = {}" |
| .format(msg, self.range_query_neighbors * 1.0 / range_query_calls)) |
| self.__init__() # Clear counters and start over |
| |
| def within_range_tf(point, candidate_ids, candidate_points, |
| metric_cutoff, norm=2): |
| sd = tf.squared_difference(tfcandidates, tfpoint) |
| distances = tf.math.reduce_sum(sd, 1) |
| output = tf.where(distances <= 2) |
| res = output.eval(session=sess) |
| res.shape = res.shape[0] |
| return res |
| |
| def within_range_np_linalg(point, candidate_ids, candidate_points, |
| eps, norm=2): |
| distances = np.linalg.norm(candidate_points - point, ord=norm, axis=1) |
| return candidate_ids[distances <= eps] |
| |
| def within_range_np(point, candidate_ids, candidate_points, |
| metric_cutoff, norm=2): |
| if norm==1: |
| distances = np.sum(candidate_points, axis=1) |
| else: |
| distances = np.sum(np.square(candidate_points - point), axis=1) |
| return candidate_ids[distances <= metric_cutoff] |
| |
| class dbscan_record(object): |
| def __init__(self, obj, eps=None): |
| if eps is not None: |
| self._init_from_dict(obj, eps) |
| else: |
| self._clone(obj) |
| |
| @staticmethod |
| def from_dict(d, eps): |
| if d['leaf_id'] == d['dist_id']: |
| return dbscan_internal_record(d, eps) |
| else: |
| return dbscan_external_record(d, eps) |
| |
| def _init_from_dict(self, d, eps): |
| """ |
| dbscan_record dict constructor |
| The only reason for passing eps here is so we |
| can pre-compute the point's neighborhood for later |
| """ |
| self.id = d['id'] |
| self.point = d['point'] |
| self.is_core_point = False |
| self.cluster_id = label.UNLABELLED |
| self.leaf_id = d['leaf_id'] |
| self.dist_id = d['dist_id'] |
| |
| # private members (for convenience, not for output) |
| p = np.array(self.point) |
| self._np_point = p |
| self._bounds = np.concatenate([p, p]) |
| self._neighborhood = np.concatenate([p - eps, p + eps]) |
| |
| def _clone(self, rec): |
| """ |
| dbscan_record copy constructor - we call this |
| just before outputing any row. For external |
| points, we have to clone it each time so that |
| we can output multiple rows. For internal points, |
| it's just a quick way of cleaning it up (removing |
| the extra private vars). |
| """ |
| # Copy only public members |
| self.id = rec.id |
| self.point = rec.point |
| self.is_core_point = rec.is_core_point |
| self.cluster_id = rec.cluster_id |
| self.leaf_id = rec.leaf_id |
| self.dist_id = rec.dist_id |
| |
| def __repr__(self): |
| fmt = 'id={},point[{}],cluster_id={},core={},leaf_id={},dist_id={}' |
| rep = fmt.format(self.id, len(self.point), self.cluster_id, |
| self.is_core_point, self.leaf_id, self.dist_id) |
| return 'dbscan_record({})'.format(rep) |
| |
| def __eq__(self, other): |
| if not isinstance(other, dbscan_record): |
| return False |
| return self.__dict__ == other.__dict__ |
| |
| class dbscan_internal_record(dbscan_record): |
| def __init__(self, obj, eps=None): |
| dbscan_record.__init__(self, obj, eps) |
| |
| def _init_from_dict(self, d, eps): |
| """ |
| private members specific to internal records |
| """ |
| dbscan_record._init_from_dict(self, d, eps) |
| self._external_neighbors = [] |
| self._internal_neighbor_count = 0 |
| self._possible_border_points = [] |
| |
| def not_core_point(self, min_samples): |
| """ |
| Computes self.is_core_point from internal and |
| external neighbor counts, and returns its negation |
| """ |
| ext_neighbors = len(self._external_neighbors) |
| int_neighbors = self._internal_neighbor_count |
| num_neighbors = ext_neighbors + int_neighbors |
| self.is_core_point = (num_neighbors + 1 >= min_samples) |
| return not self.is_core_point |
| |
| class dbscan_external_record(dbscan_record): |
| def __init__(self, obj, eps=None): |
| dbscan_record.__init__(self, obj, eps) |
| |
| def _init_from_dict(self, d, eps): |
| """ |
| private members specific to external records |
| """ |
| dbscan_record._init_from_dict(self, d, eps) |
| self._nearby_clusters = set() |
| |
| class LeafStorage: |
| """ |
| LeafStorage |
| |
| This is a per-leaf storage class for dbscan_leaf(). |
| There are 3 containers within LeafStorage: |
| |
| rtree - a spatial index of the id's of the leaf's internal points |
| records - a hash table for looking up dbscan_internal_records by id |
| external_records - an ordered list of dbscan_external_records |
| |
| Methods: |
| |
| add_point() - adds a point to LeafStorage |
| range_query() - returns a list of internal point id's within eps of a point |
| update_neighbors() - updates internal neighbor counts based on a point and |
| list of neighboring internal points |
| label_point() - assigns a point to a specific cluster, or marks it as noise |
| |
| yield_external_records() - clones and yields a copy of each dbscan_external_record |
| based on its list of nearby_clusters |
| """ |
| def __init__(self, dimension, dtype, eps, metric, min_samples): |
| self.n_dims = dimension |
| self.dtype = dtype |
| self.eps = eps |
| self.metric = metric |
| self.min_samples = min_samples |
| props = index.Property(dimension=dimension) |
| self.props = props |
| self.rtree = index.Index(properties=props) |
| self.records = {} |
| self.external_records = [] |
| |
| def add_point(self, rec): |
| if rec.leaf_id == rec.dist_id: |
| self.rtree.add(rec.id, rec._bounds) |
| self.records[rec.id] = rec |
| else: |
| self.external_records.append(rec) |
| |
| def range_query(self, db_rec): |
| """ |
| Returns a list of records of all neighboring |
| points within eps distance from point. Returns None if the |
| number of results will be < min_results. Also returns None |
| if number of results > max_results > 0 The special value |
| max_results = 0 means unlimited results (return all). |
| """ |
| global pstats |
| pstats.range_query_calls += 1 |
| start_time = time() |
| |
| eps = self.eps |
| metric = self.metric |
| records = self.records |
| rtree = self.rtree |
| |
| if not metric in ('squared_dist_norm2', 'dist_norm2', 'dist_norm1'): |
| plpy.error("Sorry, optimized method does not support {} metric yet".format(metric)) |
| |
| norm = 1 if metric == 'dist_norm1' else 2 |
| metric_cutoff = eps if metric == 'dist_norm1' else eps*eps |
| |
| if isinstance(db_rec, dbscan_internal_record): |
| # As long as we update the neighbor counts of |
| # of a core point's neighbors before returning, |
| # we can safely remove it from the rtree for |
| # future searches. If this is a non-core point, |
| # we still delete it, but add its id to the list of |
| # _possible_border_points in the db_rec of each |
| # of its unlabelled neighbors |
| self.rtree.delete(db_rec.id, db_rec._bounds) |
| |
| DEBUG.start_timing('rtree_count') |
| n = rtree.count(db_rec._neighborhood) |
| DEBUG.print_timing('rtree_count') |
| |
| DEBUG.start_timing('rtree_intersection') |
| ids = rtree.intersection(db_rec._neighborhood, objects=False) |
| DEBUG.print_timing('rtree_intersection') |
| |
| candidate_ids = np.empty(n, dtype='int64') |
| candidate_points = np.empty([n, self.n_dims], dtype=self.dtype) |
| i = 0 |
| for id in ids: |
| candidate_ids[i] = id |
| candidate_points[i] = records[id]._np_point |
| i += 1 |
| |
| # Avoid catastrophic issues which are extremeley difficult to debug |
| assert i == n |
| |
| pstats.range_query_candidates += n |
| |
| DEBUG.start_timing('within_range_np_linalg') |
| neighbors = within_range_np_linalg( |
| db_rec._np_point, |
| candidate_ids, |
| candidate_points, |
| eps, |
| norm |
| ) |
| DEBUG.print_timing('within_range_np_linalg') |
| |
| pstats.range_query_neighbors += len(neighbors) |
| end_time = time() |
| pstats.range_query_time += (end_time - start_time) |
| |
| if isinstance(db_rec, dbscan_internal_record): |
| # Necessary because we are deleting it from the rtree |
| # (won't show up in future searches) |
| self.update_neighbors(db_rec, neighbors) |
| neighbors = np.concatenate([neighbors, np.array(db_rec._possible_border_points, dtype='int64')]) |
| elif isinstance(db_rec, dbscan_external_record): |
| for id in neighbors: |
| records[id]._external_neighbors.append(db_rec) |
| else: |
| raise Exception("range_query() received unexpected type {}".format(type(db_rec))) |
| |
| return neighbors |
| |
| def update_neighbors(self, db_rec, neighbors): |
| db_rec._internal_neighbor_count += len(neighbors) |
| |
| not_core_point = db_rec.not_core_point(self.min_samples) |
| |
| for n_id in neighbors: |
| n_rec = self.records[n_id] |
| n_rec._internal_neighbor_count += 1 |
| if not_core_point and n_rec.cluster_id == label.UNLABELLED: |
| n_rec._possible_border_points.append(db_rec.id) |
| |
| |
| def label_point(self, rec, cluster): |
| """ |
| Labels a point as a member of a particular cluster, or if |
| cluster = None as a NOISE_POINT. |
| """ |
| rec.cluster_id = cluster |
| |
| if isinstance(rec, dbscan_internal_record): |
| if cluster != label.NOISE_POINT: |
| # Mark nearby external points as near this cluster |
| for ext_neighbor in rec._external_neighbors: |
| ext_neighbor._nearby_clusters.add(cluster) |
| |
| def yield_external_records(self): |
| external_records = self.external_records |
| |
| for rec in external_records: |
| for cluster_id in rec._nearby_clusters: |
| rec_clone = dbscan_external_record(rec) |
| self.label_point(rec_clone, cluster_id) |
| yield rec_clone |
| |
| DEBUG_WITH_TRACEBACKS = False |
| |
| # Just for debugging... so we receive the traceback |
| def dbscan_leaf(db_rec, eps, min_samples, metric, num_internal_points, |
| num_external_points, SD, **kwargs): |
| res = _dbscan_leaf(db_rec, eps, min_samples, metric, num_internal_points, |
| num_external_points, SD) |
| |
| if DEBUG_WITH_TRACEBACKS: |
| ret = [] |
| for i, db_rec in enumerate(res): |
| ret.append(db_rec) # Returning a generator breaks the traceback forwarding |
| return ret |
| else: |
| return res |
| |
| def _dbscan_leaf(db_rec, eps, min_samples, metric, num_internal_points, |
| num_external_points, SD): |
| """ |
| Performs classic dbscan algorithm in parallel on each leaf |
| """ |
| global pstats |
| |
| num_points = num_internal_points + num_external_points |
| |
| db_rec = dbscan_record.from_dict(db_rec, eps) |
| id = db_rec.id |
| dist_id = db_rec.dist_id |
| |
| if dist_id in SD: |
| leaf = SD[dist_id] |
| else: |
| pstats = dbscan_perfstats() |
| DEBUG.start_timing("dbscan_leaf") |
| DEBUG.start_timing("dbscan_leaf_load_rtree") |
| leaf = LeafStorage( len(db_rec.point), |
| db_rec._np_point.dtype, |
| eps, |
| metric, |
| min_samples |
| ) |
| SD[dist_id] = leaf |
| |
| records = leaf.records |
| external_records = leaf.external_records |
| |
| leaf.add_point(db_rec) |
| |
| if len(records) == num_internal_points and \ |
| len(external_records) == num_external_points: |
| DEBUG.print_timing("dbscan_leaf_load_rtree") |
| DEBUG.start_timing("dbscan_leaf_external") |
| |
| # Find all internal neighbors of external points |
| for ext_rec in external_records: |
| DEBUG.start_timing("external_rangequery") |
| # We don't need to save the result, we only care about |
| # the side effect of adding this point to the list of |
| # neighbor._ext_neighbors for each internal neighbor found |
| _ = leaf.range_query(ext_rec) |
| DEBUG.print_timing("external_rangequery") |
| |
| DEBUG.print_timing("dbscan_leaf_external") |
| pstats.show("external") |
| pstats = dbscan_perfstats() |
| |
| # Classify internal points into clusters (main dbscan algorithm), |
| # and yield resulting internal records |
| DEBUG.start_timing("dbscan_leaf_internal") |
| cluster = 0 |
| for rec in records.values(): |
| if rec.cluster_id != label.UNLABELLED: |
| continue |
| |
| DEBUG.start_timing('internal_rangequery') |
| neighbors = leaf.range_query(rec) |
| DEBUG.print_timing('internal_rangequery') |
| |
| if not rec.is_core_point: # rec.is_core_point field will be set inside range_query(rec) |
| leaf.label_point(rec, label.NOISE_POINT) |
| continue |
| |
| cluster += 1 |
| |
| leaf.label_point(rec, cluster) |
| yield dbscan_record(rec) |
| |
| queue = deque([neighbors]) |
| DEBUG.start_timing('internal_per_cluster') |
| while len(queue) > 0: |
| for next_id in queue.popleft(): |
| rec2 = records[next_id] |
| if rec2.cluster_id != label.NOISE_POINT and \ |
| rec2.cluster_id != label.UNLABELLED: |
| # We delete all points from the unlabelled |
| # rtree as soon as we assign them to a cluster. |
| # But we will still get here if the same point |
| # gets returned by more than one range query |
| # and added to the queue before the first reference |
| # to it gets popped and processed. The point will |
| # be labelled as a part of this cluster the first time |
| # it gets popped. We can ignore any duplicate references |
| # to it that get popped after that. |
| continue |
| |
| DEBUG.start_timing('internal_rangequery') |
| neighbors = leaf.range_query(rec2) |
| DEBUG.print_timing('internal_rangequery') |
| |
| if not rec2.is_core_point: # rec2.is_core_point field will be set inside range_query(rec2) |
| # Label as border point |
| leaf.label_point(rec2, cluster) |
| yield dbscan_record(rec2) |
| continue |
| |
| queue.append(neighbors) |
| # Label as core point |
| leaf.label_point(rec2, cluster) |
| yield dbscan_record(rec2) |
| |
| DEBUG.print_timing('internal_per_cluster') |
| |
| DEBUG.print_timing("dbscan_leaf_internal") |
| pstats.show("internal") |
| |
| # Return rows for external records (one for each |
| # cluster an external point is close to) |
| DEBUG.start_timing("yield_external_records") |
| for ext_rec in leaf.yield_external_records(): |
| yield ext_rec |
| DEBUG.print_timing("yield_external_records") |
| |
| # Clean up for next time, if there is one |
| del SD[dist_id] |
| |
| DEBUG.print_timing("dbscan_leaf") |
| |
| # The snowflake table is used to reduce the size of the edge table. |
| # Note that the sole purpose of the edge table is finding the connected |
| # components. Which means removing some of the edges is fine as long as the |
| # component is intact. |
| |
| # We call it snowflake because the end result will look like a point in the |
| # middle with a bunch of edges coming out of it to the other connected points. |
| |
| # Example: |
| # Edges: [1,2] [1,3] [1,4] [2,3] [2,4] [3,1] [3,4] [5,6] [6,7] [7,8] [7,5] |
| # Result: 1 and 5 are snowflake cores |
| # Edges: [1,2] [1,3] [1,4] [5,6] [5,7] [5,8] |
| |
| # This is a proto-wcc operation (creates connected components) but we still |
| # need to run the actual wcc to combine snowflakes from different segments. |
| |
| def sf_transition(state, src, dest, n_rows, gp_segment_id, **kwargs): |
| |
| SD = kwargs['SD'] |
| if not state: |
| data = [] |
| SD['counter'] = 0 |
| else: |
| data = SD['data'] |
| |
| counter = SD['counter'] |
| |
| data.append([src,dest]) |
| ret = [[-1,-1],[-1,-1]] |
| |
| my_n_rows = n_rows[gp_segment_id] |
| |
| if len(data) == my_n_rows: |
| |
| cl_ids = {} |
| clid_counter = 1 |
| cl_counts = {} |
| for i in data: |
| |
| key1 = i[0] |
| key2 = i[1] |
| |
| cl_id_k1 = cl_ids[key1] if key1 in cl_ids else None |
| cl_id_k2 = cl_ids[key2] if key2 in cl_ids else None |
| |
| if not cl_id_k1 and not cl_id_k2: |
| cl_ids[key1] = clid_counter |
| cl_ids[key2] = clid_counter |
| cl_counts[clid_counter] = 2 |
| clid_counter += 1 |
| elif cl_id_k1 and not cl_id_k2: |
| cl_ids[key2] = cl_id_k1 |
| cl_counts[cl_ids[key1]] += 1 |
| elif not cl_id_k1 and cl_id_k2: |
| cl_ids[key1] = cl_id_k2 |
| cl_counts[cl_ids[key2]] += 1 |
| else: |
| if cl_id_k1 != cl_id_k2: |
| if cl_counts[cl_id_k1] > cl_counts[cl_id_k2]: |
| for chkey,chvalue in cl_ids.items(): |
| if chvalue == cl_id_k2: |
| cl_ids[chkey] = cl_id_k1 |
| cl_counts[cl_id_k1] += cl_counts[cl_id_k2] |
| cl_counts[cl_id_k2] = 0 |
| else: |
| for chkey,chvalue in cl_ids.items(): |
| if chvalue == cl_id_k1: |
| cl_ids[chkey] = cl_id_k2 |
| cl_counts[cl_id_k2] += cl_counts[cl_id_k1] |
| cl_counts[cl_id_k1] = 0 |
| |
| if cl_ids: |
| |
| running_cl_id = -1 |
| running_sf_center = -1 |
| ret = [] |
| for vertex_id, vertex_cl in sorted(cl_ids.items(), key=lambda item: item[1]): |
| |
| # Check if we are still in the same snowflake |
| if vertex_cl != running_cl_id: |
| |
| running_sf_center = vertex_id |
| running_cl_id = vertex_cl |
| else: |
| ret.append([running_sf_center,vertex_id]) |
| |
| SD['data'] = data |
| |
| return ret |
| |
| def sf_merge(state1, state2, **kwargs): |
| |
| if state1: |
| return state1 |
| else: |
| return state2 |
| |
| def sf_final(state, **kwargs): |
| |
| return state |
| |
| def _validate_dbscan(schema_madlib, source_table, output_table, id_column, |
| expr_point, eps, min_samples, metric, algorithm, max_depth): |
| |
| input_tbl_valid(source_table, 'dbscan') |
| output_tbl_valid(output_table, 'dbscan') |
| output_summary_table = add_postfix(output_table, '_summary') |
| output_tbl_valid(output_summary_table, 'dbscan') |
| |
| _assert(is_var_valid(source_table, id_column), |
| "dbscan error: {0} is an invalid column or " |
| "expression for id param".format(id_column)) |
| |
| id_col_type = get_expr_type(id_column, source_table) |
| _assert(is_valid_psql_type(id_col_type, INTEGER), |
| "dbscan Error: unique id column or expression '{0}' in input table must" |
| " evaluate to an integer type.".format(id_column)) |
| |
| _assert(is_var_valid(source_table, expr_point), |
| "dbscan error: {0} is an invalid column name or " |
| "expression for expr_point param".format(expr_point)) |
| |
| point_col_type = get_expr_type(expr_point, source_table) |
| _assert(is_valid_psql_type(point_col_type, NUMERIC | ONLY_ARRAY), |
| "dbscan Error: Feature column or expression '{0}' in input table is not" |
| " a numeric array.".format(expr_point)) |
| |
| _assert(eps > 0, "dbscan Error: eps has to be a positive number") |
| |
| _assert(min_samples > 0, "dbscan Error: min_samples has to be a positive number") |
| |
| fn_dist_list = ['dist_norm1', 'dist_norm2', 'squared_dist_norm2', |
| 'dist_angle', 'dist_tanimoto'] |
| _assert(metric in fn_dist_list, |
| "dbscan Error: metric has to be one of the madlib defined distance functions") |
| |
| _assert(algorithm == METHOD_BRUTE_FORCE or RTREE_ENABLED == 1, |
| "dbscan Error: Cannot use rtree without the necessary python module: rtree") |
| _assert(max_depth is None or max_depth >= 0, |
| "dbscan Error: max_segmentation_depth must be a non-negative integer or NULL") |
| |
| def _validate_dbscan_predict(schema_madlib, dbscan_table, source_table, |
| id_column, expr_point, output_table): |
| |
| input_tbl_valid(source_table, 'dbscan') |
| input_tbl_valid(dbscan_table, 'dbscan') |
| dbscan_summary_table = add_postfix(dbscan_table, '_summary') |
| input_tbl_valid(dbscan_summary_table, 'dbscan') |
| output_tbl_valid(output_table, 'dbscan') |
| |
| _assert(is_var_valid(source_table, id_column), |
| "dbscan error: {0} is an invalid column name or " |
| "expression for id param".format(id_column)) |
| |
| id_col_type = get_expr_type(id_column, source_table) |
| _assert(is_valid_psql_type(id_col_type, INTEGER), |
| "dbscan Error: unique id column or expression '{0}' in input table must" |
| " evaluate to an integer type.".format(id_column)) |
| |
| _assert(is_var_valid(source_table, expr_point), |
| "dbscan error: {0} is an invalid column name or " |
| "expression for expr_point param".format(expr_point)) |
| |
| point_col_type = get_expr_type(expr_point, source_table) |
| _assert(is_valid_psql_type(point_col_type, NUMERIC | ONLY_ARRAY), |
| "dbscan Error: Feature column or expression '{0}' in train table is not" |
| " a numeric array.".format(expr_point)) |
| |
| def dbscan_help(schema_madlib, message=None, **kwargs): |
| """ |
| Help function for dbscan |
| |
| Args: |
| @param schema_madlib |
| @param message: string, Help message string |
| @param kwargs |
| |
| Returns: |
| String. Help/usage information |
| """ |
| if message is not None and \ |
| message.lower() in ("usage", "help", "?"): |
| help_string = """ |
| ----------------------------------------------------------------------- |
| USAGE |
| ----------------------------------------------------------------------- |
| SELECT {schema_madlib}.dbscan( |
| source_table, -- Name of the training data table |
| output_table, -- Name of the output table |
| id_column, -- A column or expression of columns specifying a unique id for rows in the source_table |
| expr_point, -- Column name or expression for data points |
| eps, -- The minimum radius of a cluster |
| min_samples, -- The minimum size of a cluster |
| metric, -- The name of the function to use to calculate the |
| -- distance |
| algorithm, -- The algorithm to use for dbscan: brute_force or optimized (default=optimized) |
| max_segmentation_depth -- This has no effect on method=brute. For method=rtree, this |
| limits the number of splits to be considered while dividing |
| the dataset into smaller spacial regions for optimal parallel performance. |
| The default ( NULL or ommitted ) value means it will be unlimited, |
| but note that it will never break the data up into more regions than |
| segments in a greenplum cluster, and in some cases may choose fewer. |
| Setting this to non-NULL will decrease up-front overhead, and can |
| be useful if you expect to find only a small number of densely populated |
| clusters, or if the dataset itself is small enough where initial planning |
| overhead matters more than scaling performance. This parameter will |
| not affect the output, only the performance. |
| ); |
| |
| ----------------------------------------------------------------------- |
| OUTPUT |
| ----------------------------------------------------------------------- |
| The output of the dbscan function is a table with the following columns: |
| |
| id The id of each point which has been assigned to a cluster |
| cluster_id The cluster_id's assigned to each point |
| is_core_point Boolean column that indicates if the point is core or not |
| point The coordinates of each classified point |
| """ |
| else: |
| help_string = """ |
| ---------------------------------------------------------------------------- |
| SUMMARY |
| ---------------------------------------------------------------------------- |
| DBSCAN is a density-based clustering algorithm. Given a set of points in |
| some space, it groups together points that are closely packed together |
| (points with many nearby neighbors), marking as outliers points that lie |
| alone in low-density regions (whose nearest neighbors are too far away). |
| -- |
| For an overview on usage, run: |
| SELECT {schema_madlib}.dbscan('usage'); |
| SELECT {schema_madlib}.dbscan_predict('usage'); |
| """ |
| return help_string.format(schema_madlib=schema_madlib) |
| # ------------------------------------------------------------------------------ |
| |
| def dbscan_predict_help(schema_madlib, message=None, **kwargs): |
| """ |
| Help function for dbscan |
| |
| Args: |
| @param schema_madlib |
| @param message: string, Help message string |
| @param kwargs |
| |
| Returns: |
| String. Help/usage information |
| """ |
| if message is not None and \ |
| message.lower() in ("usage", "help", "?"): |
| help_string = """ |
| ----------------------------------------------------------------------- |
| USAGE |
| ----------------------------------------------------------------------- |
| SELECT {schema_madlib}.dbscan_predict( |
| dbscan_table, -- Name of the tdbscan output table |
| new_point -- Double precision array representing the point |
| -- for prediction |
| ); |
| |
| ----------------------------------------------------------------------- |
| OUTPUT |
| ----------------------------------------------------------------------- |
| The output of the dbscan_predict is an integer indicating the cluster_id |
| of given point |
| """ |
| else: |
| help_string = """ |
| ---------------------------------------------------------------------------- |
| SUMMARY |
| ---------------------------------------------------------------------------- |
| DBSCAN is a density-based clustering algorithm. Given a set of points in |
| some space, it groups together points that are closely packed together |
| (points with many nearby neighbors), marking as outliers points that lie |
| alone in low-density regions (whose nearest neighbors are too far away). |
| -- |
| For an overview on usage, run: |
| SELECT {schema_madlib}.dbscan('usage'); |
| SELECT {schema_madlib}.dbscan_predict('usage'); |
| """ |
| return help_string.format(schema_madlib=schema_madlib) |
| # ------------------------------------------------------------------------------ |