blob: 00986c574b59f186d6bad7427e395fa702217a4c [file] [log] [blame]
# coding=utf-8
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import plpy
from utilities.control import MinWarning
from utilities.utilities import _assert
from utilities.utilities import unique_string
from utilities.utilities import add_postfix
from utilities.utilities import INTEGER, NUMERIC, ONLY_ARRAY
from utilities.utilities import is_valid_psql_type
from utilities.utilities import is_platform_pg
from utilities.utilities import num_features
from utilities.utilities import get_seg_number
from utilities.validate_args import input_tbl_valid, output_tbl_valid
from utilities.validate_args import is_var_valid
from utilities.validate_args import cols_in_tbl_valid
from utilities.validate_args import get_expr_type
from utilities.validate_args import get_algorithm_name
from graph.wcc import wcc
from math import log
from math import ceil
from math import sqrt
from time import time
import numpy as np
from collections import deque
import utilities.debug as DEBUG
DEBUG.plpy_info_enabled = False
DEBUG.plpy_execute_enabled = False
DEBUG.timings_enabled = False
try:
from rtree import index
except ImportError:
RTREE_ENABLED=0
else:
RTREE_ENABLED=1
METHOD_BRUTE_FORCE = 'brute_force'
METHOD_OPTIMIZED = 'optimized'
DEFAULT_MIN_SAMPLES = 5
DEFAULT_METRIC = 'squared_dist_norm2'
def dbscan(schema_madlib, source_table, output_table, id_column, expr_point,
eps, min_samples, metric, algorithm, max_segmentation_depth, **kwargs):
with MinWarning("warning"):
# algorithm=None is handled in get_algorithm_name()
min_samples = DEFAULT_MIN_SAMPLES if not min_samples else min_samples
metric = DEFAULT_METRIC if not metric else metric
num_segs = get_seg_number()
algorithm = get_algorithm_name(algorithm, METHOD_OPTIMIZED,
[METHOD_BRUTE_FORCE, METHOD_OPTIMIZED], 'DBSCAN')
if max_segmentation_depth is None:
# Default to num_segs
max_depth = num_segs
else:
max_depth = max_segmentation_depth
if algorithm != METHOD_OPTIMIZED:
plpy.warning("Ignoring max_segmentation_depth={} param, "
"N/A to algorithm={}".format(max_depth, algorithm))
_validate_dbscan(schema_madlib, source_table, output_table, id_column,
expr_point, eps, min_samples, metric, algorithm, max_depth)
dist_by_src = '' if is_platform_pg() else 'DISTRIBUTED BY (__src__)'
dist_by_id = '' if is_platform_pg() else 'DISTRIBUTED BY (id)'
dist_by_reach_id = '' if is_platform_pg() else 'DISTRIBUTED BY (__reachable_id__)'
dist_by_seg_id = '' if is_platform_pg() else 'DISTRIBUTED BY (seg_id)'
distributed_by = '' if is_platform_pg() else 'DISTRIBUTED BY (__dist_id__)'
core_points_table = unique_string(desp='core_points_table')
core_edge_table = unique_string(desp='core_edge_table')
distance_table = unique_string(desp='distance_table')
source_view = unique_string(desp='source_view')
plpy.execute("DROP VIEW IF EXISTS {0}".format(source_view))
sql = """
CREATE VIEW {source_view} AS
SELECT ({id_column})::BIGINT AS id, ({expr_point})::DOUBLE PRECISION[] AS point
FROM {source_table}
""".format(**locals())
plpy.execute(sql)
if algorithm == METHOD_OPTIMIZED:
dbscan_opt = DBSCAN_optimized(schema_madlib, source_view, output_table,
eps, min_samples, metric, max_depth)
dbscan_opt.run()
else:
# Calculate pairwise distances
sql = """
CREATE TEMP TABLE {distance_table} AS
SELECT __src__, __dest__ FROM (
SELECT __t1__.id AS __src__,
__t2__.id AS __dest__,
{schema_madlib}.{metric}(
__t1__.point, __t2__.point) AS __dist__
FROM {source_view} AS __t1__, {source_view} AS __t2__)q1
WHERE __dist__ < {eps}
{dist_by_src}
""".format(**locals())
plpy.execute(sql)
# Find core points
sql = """
CREATE TEMP TABLE {core_points_table} AS
SELECT * FROM (
SELECT __src__ AS id, count(*) AS __count__
FROM {distance_table} GROUP BY __src__) q1
WHERE __count__ >= {min_samples}
{dist_by_id}
""".format(**locals())
plpy.execute(sql)
# Find the connections between core points to form the clusters
sql = """
CREATE TEMP TABLE {core_edge_table} AS
SELECT __src__, __dest__
FROM {distance_table} AS __t1__, (SELECT array_agg(id) AS arr
FROM {core_points_table}) __t2__
WHERE __t1__.__src__ = ANY(arr) AND __t1__.__dest__ = ANY(arr)
{dist_by_src}
""".format(**locals())
plpy.execute(sql)
sql = """
SELECT count(*) FROM {core_points_table}
""".format(**locals())
core_count = plpy.execute(sql)[0]['count']
_assert(core_count != 0, "DBSCAN: Cannot find any core points/clusters.")
# Start snowflake creation
if is_platform_pg():
sql = """
SELECT count(*) FROM {core_edge_table}
""".format(**locals())
count = plpy.execute(sql)[0]['count']
counts_list = [int(count)]
seg_id = 0
group_by_clause = ''
else:
sql = """
SELECT count(*), gp_segment_id FROM {core_edge_table} GROUP BY gp_segment_id
""".format(**locals())
count_result = plpy.execute(sql)
seg_num = get_seg_number()
counts_list = [0]*seg_num
for i in count_result:
counts_list[int(i['gp_segment_id'])] = int(i['count'])
seg_id = 'gp_segment_id'
group_by_clause = 'GROUP BY gp_segment_id'
snowflake_table = unique_string(desp='snowflake_table')
sf_edge_table = unique_string(desp='sf_edge_table')
plpy.execute("DROP TABLE IF EXISTS {0}, {1}".format(
snowflake_table, sf_edge_table))
sql = """
CREATE TEMP TABLE {snowflake_table} AS
SELECT {seg_id}::INTEGER AS seg_id,
{schema_madlib}.build_snowflake_table( __src__,
__dest__,
ARRAY{counts_list},
{seg_id}
) AS __sf__
FROM {core_edge_table} {group_by_clause}
{dist_by_seg_id}
""".format(**locals())
plpy.execute(sql)
sql = """
CREATE TEMP TABLE {sf_edge_table} AS
SELECT seg_id, (edge).src AS __src__, (edge).dest AS __dest__
FROM (
SELECT seg_id,
{schema_madlib}.__dbscan_unpack_edge(__sf__) AS edge
FROM {snowflake_table}
) q1
{dist_by_seg_id}
""".format(**locals())
plpy.execute(sql)
# Run wcc to get the min id for each cluster
wcc(schema_madlib, core_points_table, 'id', sf_edge_table,
'src=__src__, dest=__dest__', output_table, None)
plpy.execute("""
ALTER TABLE {0}
ADD COLUMN is_core_point BOOLEAN,
ADD COLUMN point DOUBLE PRECISION[];
""".format(output_table)
)
plpy.execute("""
ALTER TABLE {0}
RENAME COLUMN component_id TO cluster_id;
""".format(output_table)
)
plpy.execute("""
UPDATE {0}
SET is_core_point = TRUE
""".format(output_table)
)
# Find reachable points
reachable_points_table = unique_string(desp='reachable_points_table')
plpy.execute("DROP TABLE IF EXISTS {0}".format(reachable_points_table))
sql = """
CREATE TEMP TABLE {reachable_points_table} AS
SELECT array_agg(__src__) AS __src_list__,
__dest__ AS __reachable_id__
FROM {distance_table} AS __t1__,
(SELECT array_agg(id) AS __arr__
FROM {core_points_table}) __t2__
WHERE __src__ = ANY(__arr__) AND __dest__ != ALL(__arr__)
GROUP BY __dest__
{dist_by_reach_id}
""".format(**locals())
plpy.execute(sql)
sql = """
INSERT INTO {output_table}
SELECT __reachable_id__,
cluster_id,
FALSE AS is_core_point,
NULL AS point
FROM {reachable_points_table} AS __t1__ JOIN
{output_table} AS __t2__
ON (__src_list__[1] = id)
""".format(**locals())
plpy.execute(sql)
# Add features of points to the output table to use them for prediction
sql = """
UPDATE {output_table} AS __t1__
SET point = __t2__.point
FROM {source_view} AS __t2__
WHERE __t1__.id = __t2__.id
""".format(**locals())
plpy.execute(sql)
plpy.execute("DROP TABLE IF EXISTS {0}, {1}, {2}, {3}, {4}, {5}".format(
distance_table, core_points_table, core_edge_table,
reachable_points_table, snowflake_table, sf_edge_table))
plpy.execute("DROP VIEW IF EXISTS {0}".format(source_view))
# -- Update the cluster ids to be consecutive --
sql = """
UPDATE {output_table} AS __t1__
SET cluster_id = new_id-1
FROM (
SELECT cluster_id, row_number() OVER(ORDER BY cluster_id) AS new_id
FROM {output_table}
GROUP BY cluster_id) __t2__
WHERE __t1__.cluster_id = __t2__.cluster_id
""".format(**locals())
plpy.execute(sql)
# -- Generate output summary table ---
output_summary_table = add_postfix(output_table, '_summary')
plpy.execute("DROP TABLE IF EXISTS {0}".format(output_summary_table))
sql = """
CREATE TABLE {output_summary_table} AS
SELECT $${id_column}$$::VARCHAR AS id_column,
$${expr_point}$$::VARCHAR AS expr_point,
{eps}::DOUBLE PRECISION AS eps,
{min_samples}::BIGINT AS min_points,
'{metric}'::VARCHAR AS metric
""".format(**locals())
plpy.execute(sql)
class DBSCAN(object):
def __init__(self, schema_madlib, source_table, output_table,
eps, min_samples, metric):
"""
Args:
@param schema_madlib Name of the Madlib Schema
@param source_table Training data table
@param output_table Output table of points labelled by cluster_id
@param eps The eps value defined by the user
@param min_samples min_samples defined by user
@param metric metric selected by user
"""
self.schema_madlib = schema_madlib
self.source_table = source_table
self.output_table = output_table
self.n_dims = num_features(source_table, 'point')
self.distributed_by = '' if is_platform_pg() else 'DISTRIBUTED BY (__dist_id__)'
self.min_samples = min_samples
self.metric = metric
# If squared_dist_norm2 is used, we assume eps is set for the squared
# distance. That means the border width must be sqrt(eps)
self.eps = sqrt(eps) if metric == DEFAULT_METRIC else eps
class DBSCAN_optimized(DBSCAN):
def __init__(self, schema_madlib, source_table, output_table,
eps, min_samples, metric, max_depth=None):
"""
Args:
@param schema_madlib Name of the Madlib Schema
@param source_table Training data table
@param output_table Output table of points labelled by cluster_id
@param eps The eps value defined by the user
@param min_samples min_samples defined by user
@param metric metric selected by user
"""
DBSCAN.__init__(self, schema_madlib, source_table, output_table,
eps, min_samples, metric)
self.max_depth = max_depth
self.unique_str = unique_string('DBSCAN_opt')
self.num_segs = get_seg_number()
res = plpy.execute("SELECT COUNT(*) FROM {source_table}".format(**locals()))
self.num_points = res[0]['count']
def run(self):
unique_str = self.unique_str
distributed_by = self.distributed_by
schema_madlib = self.schema_madlib
self.dist_map = 'dist_map' + unique_str
segmented_source = 'segmented_source' + unique_str
self.segmented_source = segmented_source
self.build_optimized_tree()
rowcount_table = 'rowcounts' + unique_str
leaf_clusters_table = 'leaf_clusters' + unique_str
count_rows_sql = """
DROP TABLE IF EXISTS {rowcount_table};
CREATE TEMP TABLE {rowcount_table} AS
WITH i AS (
SELECT
count(*) AS num_rows,
dist_id,
__dist_id__
FROM internal_{segmented_source}
GROUP BY dist_id, __dist_id__
), e AS (
SELECT
count(*) AS num_rows,
dist_id,
__dist_id__
FROM external_{segmented_source}
GROUP BY dist_id, __dist_id__
)
SELECT
i.num_rows AS num_internal_rows,
COALESCE(e.num_rows,0) AS num_external_rows,
i.dist_id,
__dist_id__
FROM i LEFT JOIN e
USING (dist_id, __dist_id__)
{distributed_by}
""".format(**locals())
plpy.execute(count_rows_sql)
# Run dbscan in parallel on each leaf
dbscan_leaves_sql = """
CREATE TEMP TABLE {leaf_clusters_table} AS
WITH input AS (
SELECT *
FROM internal_{segmented_source}
UNION ALL
SELECT *
FROM external_{segmented_source}
), segmented_source AS (
SELECT
ROW(
id,
point,
FALSE,
0,
leaf_id,
dist_id
)::{schema_madlib}.__dbscan_record AS rec,
__dist_id__
FROM input
)
SELECT (output).*, __dist_id__ FROM (
SELECT
{schema_madlib}.__dbscan_leaf(
rec,
{self.eps},
{self.min_samples},
'{self.metric}',
num_internal_rows,
num_external_rows
) AS output,
__dist_id__
FROM segmented_source JOIN {rowcount_table}
USING (__dist_id__)
) a {distributed_by}
""".format(**locals())
DEBUG.plpy_execute(dbscan_leaves_sql, report_segment_tracebacks=True)
plpy.execute("""
DROP TABLE IF EXISTS
internal_{segmented_source},
external_{segmented_source}
""".format(**locals()))
cluster_map = 'cluster_map' + unique_str
noise_point = label.NOISE_POINT
# Replace local cluster_id's in each leaf with
# global cluster_id's which are unique across all leaves
gen_cluster_map_sql = """
CREATE TEMP TABLE {cluster_map} AS
SELECT
dist_id,
__dist_id__,
cluster_id AS local_id,
ROW_NUMBER() OVER(
ORDER BY dist_id, cluster_id
) AS global_id
FROM {leaf_clusters_table}
WHERE dist_id = leaf_id AND cluster_id != {noise_point}
GROUP BY __dist_id__, dist_id, cluster_id
{distributed_by}
""".format(**locals())
plpy.execute(gen_cluster_map_sql)
globalize_cluster_ids_sql = """
UPDATE {leaf_clusters_table} lc
SET cluster_id = global_id
FROM {cluster_map} cm WHERE
cm.dist_id = lc.dist_id AND
cluster_id = local_id;
""".format(**locals())
DEBUG.plpy.execute(globalize_cluster_ids_sql)
intercluster_edges = 'intercluster_edges' + unique_str
dist_by_src = '' if is_platform_pg() else 'DISTRIBUTED BY (src)'
# Build intercluster table of edges connecting clusters
# on different leaves
intercluster_edge_sql = """
CREATE TEMP TABLE {intercluster_edges} AS
SELECT
internal.cluster_id AS src,
external.cluster_id AS dest
FROM {leaf_clusters_table} internal
JOIN {leaf_clusters_table} external
USING (id)
WHERE internal.is_core_point
AND internal.leaf_id = internal.dist_id
AND external.leaf_id != external.dist_id
GROUP BY src,dest
{dist_by_src}
""".format(**locals())
DEBUG.plpy.execute(intercluster_edge_sql)
res = plpy.execute("SELECT count(*) FROM {intercluster_edges}".format(**locals()))
if int(res[0]['count']) > 0:
wcc_table = 'wcc' + unique_str
plpy.execute("""
DROP TABLE IF EXISTS
{wcc_table}, {wcc_table}_summary,
{self.output_table}
""".format(**locals()))
# Find connected components of intercluster_edge_table
wcc(schema_madlib, cluster_map, 'global_id', intercluster_edges,
'src=src,dest=dest', wcc_table, None)
# Rename each cluster_id to its component id in the intercluster graph
merge_clusters_sql = """
UPDATE {leaf_clusters_table} lc
SET cluster_id = wcc.component_id
FROM {wcc_table} wcc
WHERE lc.cluster_id = wcc.global_id
""".format(**locals())
plpy.execute(merge_clusters_sql)
plpy.execute("""
DROP TABLE IF EXISTS
{wcc_table}, {wcc_table}_summary
""".format(**locals()))
add_crossborder_points_sql = """
CREATE TABLE {self.output_table} AS
SELECT
id, point, cluster_id, is_core_point
FROM (
SELECT
id,
point,
is_core_point,
leaf_id = dist_id AS is_internal,
dist_id,
CASE WHEN cluster_id = {noise_point}
THEN MAX(cluster_id) OVER(PARTITION BY id)
ELSE cluster_id
END
FROM {leaf_clusters_table}
) x WHERE is_internal AND cluster_id != {noise_point}
""".format(**locals())
DEBUG.plpy.execute(add_crossborder_points_sql)
plpy.execute("""
DROP TABLE IF EXISTS {rowcount_table},
{leaf_clusters_table}, {cluster_map},
{intercluster_edges}
""".format(**locals()))
def build_optimized_tree(self):
eps = self.eps
num_segs = self.num_segs
num_points = self.num_points
target = num_points / num_segs
source_table = self.source_table
leaf_bounds_table = 'leaf_bounds' + self.unique_str
max_depth = self.max_depth
n_dims = self.n_dims
dist_map = self.dist_map
distributed_by = self.distributed_by
internal_points = 'internal_' + self.segmented_source
external_points = 'external_' + self.segmented_source
next_cut = 'next_cut' + self.unique_str
prev_node = 'prev_node' + self.unique_str
first_cut_sql = """
DROP TABLE IF EXISTS {prev_node};
CREATE TEMP TABLE {prev_node} AS
WITH world_bounds AS (
SELECT
i,
min(point[i]) AS lower,
max(point[i])+{eps} AS upper,
max(point[i])+{eps} - min(point[i]) AS size,
floor((max(point[i])+{eps} - min(point[i])) / {eps})::INT AS eps_bins
FROM generate_series(1,{n_dims}) AS i, {source_table}
GROUP BY i
),
density_map AS (
SELECT i, eps_bin,
eps_bins,
COALESCE(density, 0) AS density
FROM (
SELECT i, eps_bins,
generate_series(0, eps_bins - 1) AS eps_bin
FROM world_bounds
) g LEFT JOIN (
SELECT i,
floor((point[i] - lower)/{eps})::INT AS eps_bin,
eps_bins,
COUNT(*) AS density
FROM
world_bounds,
{source_table}
GROUP BY i, eps_bin, eps_bins
) a
USING (i, eps_bin, eps_bins)
),
loss_table AS (
SELECT i, eps_bin,
left_internal,
{num_points} as points_in_node,
left_external,
right_external,
{self.schema_madlib}.__dbscan_segmentation_loss(
left_internal,
{num_points} - left_internal,
left_external,
right_external,
{num_segs}::BIGINT,
V,
{eps}::DOUBLE PRECISION,
{n_dims}::BIGINT,
eps_bin::DOUBLE PRECISION,
eps_bins::DOUBLE PRECISION
) AS losses
FROM (
SELECT i, eps_bin, eps_bins,
{num_segs}::BIGINT AS num_segs,
COALESCE(density, 0) AS right_external,
COALESCE(LEAD(density) OVER(PARTITION BY i ORDER BY eps_bin), 0) AS left_external,
SUM(density) OVER(PARTITION BY i ORDER BY eps_bin)::BIGINT AS left_internal
FROM (
SELECT i, generate_series(0, eps_bins - 1) AS eps_bin FROM world_bounds
) g LEFT JOIN density_map USING(i, eps_bin)
) params,
(
SELECT {self.schema_madlib}.prod(size) AS V
FROM world_bounds
) AS volume
),
optimize AS (
SELECT i, lower, cutoff, upper,
left_avail, right_avail,
left_internal, right_internal
FROM (
SELECT
i,
CASE WHEN (losses).right_avail = 0
THEN
points_in_node
ELSE
left_internal
END AS left_internal,
CASE WHEN (losses).right_avail = 0
THEN
0::BIGINT
ELSE
points_in_node - left_internal
END AS right_internal,
(losses).left_avail AS left_avail,
(losses).right_avail AS right_avail,
GREATEST((losses).left_loss,(losses).right_loss) AS total_loss,
min(
GREATEST((losses).left_loss,(losses).right_loss)
) OVER() AS total_min,
wb.lower,
CASE WHEN (losses).right_avail = 0
THEN
wb.upper
ELSE
wb.lower + {eps}*(eps_bin+1)
END AS cutoff,
wb.upper
FROM loss_table JOIN world_bounds wb USING(i)
) a WHERE total_loss = total_min LIMIT 1
)
SELECT s.id,
ARRAY[i] AS coords,
CASE WHEN s.point[i] < cutoff
THEN left_avail
ELSE right_avail
END AS avail_segs,
CASE WHEN point[i] < cutoff
THEN left_internal
ELSE right_internal
END AS points_in_node,
CASE WHEN s.point[i] < cutoff
THEN ARRAY[lower]
ELSE ARRAY[cutoff]
END AS lower_bounds,
CASE WHEN s.point[i] < cutoff
THEN ARRAY[cutoff]
ELSE ARRAY[upper]
END AS upper_bounds,
(s.point[i] >= cutoff)::INT AS node_id
FROM optimize, {source_table} s
""".format(**locals())
plpy.execute(first_cut_sql)
segment_source_sql = """
CREATE TEMP TABLE {next_cut} AS
WITH node_bounds AS (
SELECT
i,
node_id,
avail_segs,
points_in_node,
min(point[i]) AS lower,
max(point[i])+{eps} AS upper,
max(point[i])+{eps} - min(point[i]) AS size,
floor((max(point[i])+{eps} - min(point[i])) / {eps})::INT AS eps_bins
FROM generate_series(1, {n_dims}) AS i,
{prev_node} JOIN {source_table} USING (id)
GROUP BY 1,2,3,4
), density_map AS (
SELECT i, node_id, eps_bin,
COALESCE( density, 0) AS density
FROM (
SELECT node_id, i,
generate_series(0, eps_bins - 1) AS eps_bin
FROM node_bounds
) g LEFT JOIN (
SELECT node_id, i,
floor((point[i] - lower)/{eps})::INT AS eps_bin,
COUNT(*) AS density
FROM
node_bounds JOIN {prev_node} USING (node_id)
JOIN {source_table} USING (id)
GROUP BY node_id, i, eps_bin
) a
USING(i, node_id, eps_bin)
), params AS (
SELECT i, node_id, eps_bin, eps_bins,
COALESCE(density, 0) AS right_external,
COALESCE(LEAD(density) OVER(PARTITION BY node_id,i ORDER BY eps_bin), 0) AS left_external,
SUM(density) OVER(PARTITION BY node_id,i ORDER BY eps_bin)::BIGINT AS left_internal
FROM (
SELECT i, node_id, eps_bins, generate_series(0, eps_bins - 1) AS eps_bin FROM node_bounds
) g LEFT JOIN density_map USING(i, node_id, eps_bin)
), volume AS (
SELECT
node_id,
avail_segs,
points_in_node,
{self.schema_madlib}.prod(size) AS V
FROM node_bounds
WHERE i=1
GROUP BY 1,2,3
), loss_table AS (
SELECT i, node_id, eps_bin,
left_internal,
points_in_node - left_internal AS right_internal,
left_external,
right_external,
{self.schema_madlib}.__dbscan_segmentation_loss(
left_internal,
points_in_node,
left_external,
right_external,
avail_segs,
V,
{eps}::DOUBLE PRECISION,
{n_dims}::BIGINT,
eps_bin::DOUBLE PRECISION,
eps_bins::DOUBLE PRECISION
) AS losses
FROM params JOIN volume USING (node_id)
), optimize AS (
SELECT * FROM (
SELECT
node_id,
i,
(losses).left_avail,
(losses).right_avail,
CASE WHEN (losses).right_avail = 0
THEN
points_in_node
ELSE
left_internal
END AS left_internal,
CASE WHEN (losses).right_avail = 0
THEN
0::BIGINT
ELSE
points_in_node - left_internal
END AS right_internal,
node_bounds.lower,
CASE WHEN (losses).right_avail = 0
THEN
node_bounds.upper
ELSE
node_bounds.lower + {eps}*(eps_bin+1)
END AS cutoff,
node_bounds.upper,
ROW_NUMBER() OVER(
PARTITION BY node_id
ORDER BY GREATEST((losses).left_loss,(losses).right_loss)
) AS loss_rank
FROM loss_table JOIN node_bounds USING(i, node_id)
) a WHERE loss_rank = 1
)
SELECT id, coords || i AS coords,
CASE WHEN point[i] < opt.cutoff
THEN left_avail
ELSE right_avail
END AS avail_segs,
CASE WHEN point[i] < opt.cutoff
THEN left_internal
ELSE right_internal
END AS points_in_node,
lower_bounds ||
CASE WHEN point[i] < opt.cutoff
THEN
lower
ELSE
opt.cutoff
END AS lower_bounds,
opt.cutoff,
upper_bounds ||
CASE WHEN point[i] < opt.cutoff
THEN
opt.cutoff
ELSE
upper
END AS upper_bounds,
(node_id << 1) | (point[i] >= opt.cutoff)::INT AS node_id
FROM {source_table}
JOIN {prev_node}
USING (id)
JOIN optimize opt
USING (node_id)
""".format(**locals())
for i in range(max_depth):
DEBUG.plpy.info(segment_source_sql)
plpy.execute(segment_source_sql)
plpy.execute("""
DROP TABLE IF EXISTS {prev_node};
ALTER TABLE {next_cut} RENAME TO {prev_node}
""".format(**locals()))
if is_platform_pg():
# For postgres, there should only be 1 node,
# so we assign it dist_id = __dist_id__ = 0
plpy.execute("""
CREATE TABLE {dist_map} AS
SELECT 0::BIGINT AS dist_id, 0::BIGINT as __dist_id__
""".format(**locals()))
else:
# Create implicit many-to-1 map from __dist_id__ to seg_id
temp_dist_table = 'dist_table' + self.unique_str
create_dist_table_sql = """
CREATE TEMP TABLE {temp_dist_table} AS
SELECT generate_series(1,10000) AS __dist_id__
{distributed_by}
""".format(**locals())
plpy.execute(create_dist_table_sql)
# Create a 1-1 map from dist_id to __dist_id__,
# where dist_id's are re-ordered so they are evenly
# distributed among segments. This should work even
# if there are gaps in the dist_id assignments (some
# dist_id's missing)
leaves_sql = """
SELECT
node_id AS dist_id,
ROW_NUMBER() OVER() AS leaf_num
FROM {prev_node}
GROUP BY node_id
""".format(**locals())
leaves_cnt = plpy.execute("""
WITH leaves as ({leaves_sql})
SELECT COUNT(*) cnt from leaves
""".format(**locals()))[0]['cnt']
create_dist_map_sql = """
CREATE TEMP TABLE {dist_map} AS
WITH leaves as (
{leaves_sql}
), segmap AS (
SELECT
gp_segment_id AS seg_id,
ROW_NUMBER() OVER(
ORDER BY ctid, gp_segment_id
) AS leaf_num,
__dist_id__
FROM {temp_dist_table}
LIMIT {leaves_cnt}
)
SELECT dist_id, __dist_id__
FROM leaves JOIN segmap
USING (leaf_num)
{distributed_by}
""".format(**locals())
plpy.execute(create_dist_map_sql)
plpy.execute("""
DROP TABLE IF EXISTS {temp_dist_table}
""".format(**locals()))
gen_internal_points_sql = """
CREATE TEMP TABLE {internal_points} AS
SELECT id, point, node_id AS leaf_id, node_id AS dist_id, __dist_id__
FROM {source_table}
JOIN {prev_node}
USING (id)
JOIN {dist_map}
ON node_id = dist_id
{distributed_by}
""".format(**locals())
DEBUG.plpy.execute(gen_internal_points_sql)
plpy.execute("""
DROP TABLE IF EXISTS {prev_node}
""".format(**locals()))
generate_leaf_bounds_sql = """
CREATE TEMP TABLE {leaf_bounds_table} AS
SELECT
array_agg(min ORDER BY i) AS leaf_lower,
array_agg(max ORDER BY i) AS leaf_upper,
array_agg(min - {eps} ORDER BY i) AS dist_lower,
array_agg(max + {eps} ORDER BY i) AS dist_upper,
leaf_id,
__dist_id__
FROM (
SELECT
leaf_id,
i,
min(point[i]),
max(point[i]),
count(*)
FROM {internal_points}, generate_series(1, {n_dims}) AS i
GROUP BY leaf_id, i
) x
JOIN {dist_map}
ON leaf_id = dist_id
GROUP BY leaf_id, __dist_id__
{distributed_by}
""".format(**locals())
DEBUG.plpy.execute(generate_leaf_bounds_sql)
gen_external_points_sql = """
CREATE TEMP TABLE {external_points} AS
SELECT i.id, i.point, i.leaf_id, b.leaf_id AS dist_id, b.__dist_id__
FROM {leaf_bounds_table} b JOIN {internal_points} i
ON b.leaf_id != i.leaf_id
WHERE 0 <= all({self.schema_madlib}.array_sub(point, dist_lower)) AND
0 < all({self.schema_madlib}.array_sub(dist_upper, point))
{distributed_by}
""".format(**locals())
plpy.execute(gen_external_points_sql)
plpy.execute("""
DROP TABLE IF EXISTS
{leaf_bounds_table}, {dist_map}
""".format(**locals()))
def dbscan_predict(schema_madlib, dbscan_table, source_table, id_column,
expr_point, output_table, **kwargs):
with MinWarning("warning"):
_validate_dbscan_predict(schema_madlib, dbscan_table, source_table, id_column,
expr_point, output_table)
dbscan_summary_table = add_postfix(dbscan_table, '_summary')
summary = plpy.execute("SELECT * FROM {0}".format(dbscan_summary_table))[0]
eps = summary['eps']
metric = summary['metric']
sql = """
CREATE TABLE {output_table} AS
WITH src AS (
SELECT ({id_column}) AS id, ({expr_point}) AS point FROM {source_table}
)
SELECT __q1__.id, cluster_id, distance
FROM (
SELECT __t2__.id, cluster_id,
min({schema_madlib}.{metric}(__t1__.point,
__t2__.point)) as distance
FROM {dbscan_table} AS __t1__, src AS __t2__
WHERE is_core_point = TRUE
GROUP BY __t2__.id, cluster_id
) __q1__ WHERE distance <= {eps}
""".format(**locals())
result = plpy.execute(sql)
class label:
UNLABELLED = 0
NOISE_POINT = -1
pstats = None
class dbscan_perfstats:
def __init__(self):
self.range_query_calls = 0
self.range_query_time = 0.0
self.range_query_candidates = 0
self.range_query_neighbors = 0
def show(self, msg):
DEBUG.plpy.info("{}_stats: range_query total calls = {}"
.format(msg, self.range_query_calls))
range_query_calls = self.range_query_calls if self.range_query_calls else 0.00001
DEBUG.plpy.info("{}_stats: range_query total time = {}s"
.format(msg, self.range_query_time))
DEBUG.plpy.info("{}_stats: range_query total candidates = {}"
.format(msg, self.range_query_candidates))
DEBUG.plpy.info("{}_stats: range_query total neighbors returned = {}"
.format(msg, self.range_query_neighbors))
DEBUG.plpy.info("{}_stats: range_query avg time = {}s"
.format(msg, self.range_query_time / range_query_calls))
DEBUG.plpy.info("{}_stats: range_query avg candidates returned = {}"
.format(msg, self.range_query_candidates * 1.0 / range_query_calls))
DEBUG.plpy.info("{}_stats: range_query avg neighbors returned = {}"
.format(msg, self.range_query_neighbors * 1.0 / range_query_calls))
self.__init__() # Clear counters and start over
def within_range_tf(point, candidate_ids, candidate_points,
metric_cutoff, norm=2):
sd = tf.squared_difference(tfcandidates, tfpoint)
distances = tf.math.reduce_sum(sd, 1)
output = tf.where(distances <= 2)
res = output.eval(session=sess)
res.shape = res.shape[0]
return res
def within_range_np_linalg(point, candidate_ids, candidate_points,
eps, norm=2):
distances = np.linalg.norm(candidate_points - point, ord=norm, axis=1)
return candidate_ids[distances <= eps]
def within_range_np(point, candidate_ids, candidate_points,
metric_cutoff, norm=2):
if norm==1:
distances = np.sum(candidate_points, axis=1)
else:
distances = np.sum(np.square(candidate_points - point), axis=1)
return candidate_ids[distances <= metric_cutoff]
class dbscan_record(object):
def __init__(self, obj, eps=None):
if eps is not None:
self._init_from_dict(obj, eps)
else:
self._clone(obj)
@staticmethod
def from_dict(d, eps):
if d['leaf_id'] == d['dist_id']:
return dbscan_internal_record(d, eps)
else:
return dbscan_external_record(d, eps)
def _init_from_dict(self, d, eps):
"""
dbscan_record dict constructor
The only reason for passing eps here is so we
can pre-compute the point's neighborhood for later
"""
self.id = d['id']
self.point = d['point']
self.is_core_point = False
self.cluster_id = label.UNLABELLED
self.leaf_id = d['leaf_id']
self.dist_id = d['dist_id']
# private members (for convenience, not for output)
p = np.array(self.point)
self._np_point = p
self._bounds = np.concatenate([p, p])
self._neighborhood = np.concatenate([p - eps, p + eps])
def _clone(self, rec):
"""
dbscan_record copy constructor - we call this
just before outputing any row. For external
points, we have to clone it each time so that
we can output multiple rows. For internal points,
it's just a quick way of cleaning it up (removing
the extra private vars).
"""
# Copy only public members
self.id = rec.id
self.point = rec.point
self.is_core_point = rec.is_core_point
self.cluster_id = rec.cluster_id
self.leaf_id = rec.leaf_id
self.dist_id = rec.dist_id
def __repr__(self):
fmt = 'id={},point[{}],cluster_id={},core={},leaf_id={},dist_id={}'
rep = fmt.format(self.id, len(self.point), self.cluster_id,
self.is_core_point, self.leaf_id, self.dist_id)
return 'dbscan_record({})'.format(rep)
def __eq__(self, other):
if not isinstance(other, dbscan_record):
return False
return self.__dict__ == other.__dict__
class dbscan_internal_record(dbscan_record):
def __init__(self, obj, eps=None):
dbscan_record.__init__(self, obj, eps)
def _init_from_dict(self, d, eps):
"""
private members specific to internal records
"""
dbscan_record._init_from_dict(self, d, eps)
self._external_neighbors = []
self._internal_neighbor_count = 0
self._possible_border_points = []
def not_core_point(self, min_samples):
"""
Computes self.is_core_point from internal and
external neighbor counts, and returns its negation
"""
ext_neighbors = len(self._external_neighbors)
int_neighbors = self._internal_neighbor_count
num_neighbors = ext_neighbors + int_neighbors
self.is_core_point = (num_neighbors + 1 >= min_samples)
return not self.is_core_point
class dbscan_external_record(dbscan_record):
def __init__(self, obj, eps=None):
dbscan_record.__init__(self, obj, eps)
def _init_from_dict(self, d, eps):
"""
private members specific to external records
"""
dbscan_record._init_from_dict(self, d, eps)
self._nearby_clusters = set()
class LeafStorage:
"""
LeafStorage
This is a per-leaf storage class for dbscan_leaf().
There are 3 containers within LeafStorage:
rtree - a spatial index of the id's of the leaf's internal points
records - a hash table for looking up dbscan_internal_records by id
external_records - an ordered list of dbscan_external_records
Methods:
add_point() - adds a point to LeafStorage
range_query() - returns a list of internal point id's within eps of a point
update_neighbors() - updates internal neighbor counts based on a point and
list of neighboring internal points
label_point() - assigns a point to a specific cluster, or marks it as noise
yield_external_records() - clones and yields a copy of each dbscan_external_record
based on its list of nearby_clusters
"""
def __init__(self, dimension, dtype, eps, metric, min_samples):
self.n_dims = dimension
self.dtype = dtype
self.eps = eps
self.metric = metric
self.min_samples = min_samples
props = index.Property(dimension=dimension)
self.props = props
self.rtree = index.Index(properties=props)
self.records = {}
self.external_records = []
def add_point(self, rec):
if rec.leaf_id == rec.dist_id:
self.rtree.add(rec.id, rec._bounds)
self.records[rec.id] = rec
else:
self.external_records.append(rec)
def range_query(self, db_rec):
"""
Returns a list of records of all neighboring
points within eps distance from point. Returns None if the
number of results will be < min_results. Also returns None
if number of results > max_results > 0 The special value
max_results = 0 means unlimited results (return all).
"""
global pstats
pstats.range_query_calls += 1
start_time = time()
eps = self.eps
metric = self.metric
records = self.records
rtree = self.rtree
if not metric in ('squared_dist_norm2', 'dist_norm2', 'dist_norm1'):
plpy.error("Sorry, optimized method does not support {} metric yet".format(metric))
norm = 1 if metric == 'dist_norm1' else 2
metric_cutoff = eps if metric == 'dist_norm1' else eps*eps
if isinstance(db_rec, dbscan_internal_record):
# As long as we update the neighbor counts of
# of a core point's neighbors before returning,
# we can safely remove it from the rtree for
# future searches. If this is a non-core point,
# we still delete it, but add its id to the list of
# _possible_border_points in the db_rec of each
# of its unlabelled neighbors
self.rtree.delete(db_rec.id, db_rec._bounds)
DEBUG.start_timing('rtree_count')
n = rtree.count(db_rec._neighborhood)
DEBUG.print_timing('rtree_count')
DEBUG.start_timing('rtree_intersection')
ids = rtree.intersection(db_rec._neighborhood, objects=False)
DEBUG.print_timing('rtree_intersection')
candidate_ids = np.empty(n, dtype='int64')
candidate_points = np.empty([n, self.n_dims], dtype=self.dtype)
i = 0
for id in ids:
candidate_ids[i] = id
candidate_points[i] = records[id]._np_point
i += 1
# Avoid catastrophic issues which are extremeley difficult to debug
assert i == n
pstats.range_query_candidates += n
DEBUG.start_timing('within_range_np_linalg')
neighbors = within_range_np_linalg(
db_rec._np_point,
candidate_ids,
candidate_points,
eps,
norm
)
DEBUG.print_timing('within_range_np_linalg')
pstats.range_query_neighbors += len(neighbors)
end_time = time()
pstats.range_query_time += (end_time - start_time)
if isinstance(db_rec, dbscan_internal_record):
# Necessary because we are deleting it from the rtree
# (won't show up in future searches)
self.update_neighbors(db_rec, neighbors)
neighbors = np.concatenate([neighbors, np.array(db_rec._possible_border_points, dtype='int64')])
elif isinstance(db_rec, dbscan_external_record):
for id in neighbors:
records[id]._external_neighbors.append(db_rec)
else:
raise Exception("range_query() received unexpected type {}".format(type(db_rec)))
return neighbors
def update_neighbors(self, db_rec, neighbors):
db_rec._internal_neighbor_count += len(neighbors)
not_core_point = db_rec.not_core_point(self.min_samples)
for n_id in neighbors:
n_rec = self.records[n_id]
n_rec._internal_neighbor_count += 1
if not_core_point and n_rec.cluster_id == label.UNLABELLED:
n_rec._possible_border_points.append(db_rec.id)
def label_point(self, rec, cluster):
"""
Labels a point as a member of a particular cluster, or if
cluster = None as a NOISE_POINT.
"""
rec.cluster_id = cluster
if isinstance(rec, dbscan_internal_record):
if cluster != label.NOISE_POINT:
# Mark nearby external points as near this cluster
for ext_neighbor in rec._external_neighbors:
ext_neighbor._nearby_clusters.add(cluster)
def yield_external_records(self):
external_records = self.external_records
for rec in external_records:
for cluster_id in rec._nearby_clusters:
rec_clone = dbscan_external_record(rec)
self.label_point(rec_clone, cluster_id)
yield rec_clone
DEBUG_WITH_TRACEBACKS = False
# Just for debugging... so we receive the traceback
def dbscan_leaf(db_rec, eps, min_samples, metric, num_internal_points,
num_external_points, SD, **kwargs):
res = _dbscan_leaf(db_rec, eps, min_samples, metric, num_internal_points,
num_external_points, SD)
if DEBUG_WITH_TRACEBACKS:
ret = []
for i, db_rec in enumerate(res):
ret.append(db_rec) # Returning a generator breaks the traceback forwarding
return ret
else:
return res
def _dbscan_leaf(db_rec, eps, min_samples, metric, num_internal_points,
num_external_points, SD):
"""
Performs classic dbscan algorithm in parallel on each leaf
"""
global pstats
num_points = num_internal_points + num_external_points
db_rec = dbscan_record.from_dict(db_rec, eps)
id = db_rec.id
dist_id = db_rec.dist_id
if dist_id in SD:
leaf = SD[dist_id]
else:
pstats = dbscan_perfstats()
DEBUG.start_timing("dbscan_leaf")
DEBUG.start_timing("dbscan_leaf_load_rtree")
leaf = LeafStorage( len(db_rec.point),
db_rec._np_point.dtype,
eps,
metric,
min_samples
)
SD[dist_id] = leaf
records = leaf.records
external_records = leaf.external_records
leaf.add_point(db_rec)
if len(records) == num_internal_points and \
len(external_records) == num_external_points:
DEBUG.print_timing("dbscan_leaf_load_rtree")
DEBUG.start_timing("dbscan_leaf_external")
# Find all internal neighbors of external points
for ext_rec in external_records:
DEBUG.start_timing("external_rangequery")
# We don't need to save the result, we only care about
# the side effect of adding this point to the list of
# neighbor._ext_neighbors for each internal neighbor found
_ = leaf.range_query(ext_rec)
DEBUG.print_timing("external_rangequery")
DEBUG.print_timing("dbscan_leaf_external")
pstats.show("external")
pstats = dbscan_perfstats()
# Classify internal points into clusters (main dbscan algorithm),
# and yield resulting internal records
DEBUG.start_timing("dbscan_leaf_internal")
cluster = 0
for rec in records.values():
if rec.cluster_id != label.UNLABELLED:
continue
DEBUG.start_timing('internal_rangequery')
neighbors = leaf.range_query(rec)
DEBUG.print_timing('internal_rangequery')
if not rec.is_core_point: # rec.is_core_point field will be set inside range_query(rec)
leaf.label_point(rec, label.NOISE_POINT)
continue
cluster += 1
leaf.label_point(rec, cluster)
yield dbscan_record(rec)
queue = deque([neighbors])
DEBUG.start_timing('internal_per_cluster')
while len(queue) > 0:
for next_id in queue.popleft():
rec2 = records[next_id]
if rec2.cluster_id != label.NOISE_POINT and \
rec2.cluster_id != label.UNLABELLED:
# We delete all points from the unlabelled
# rtree as soon as we assign them to a cluster.
# But we will still get here if the same point
# gets returned by more than one range query
# and added to the queue before the first reference
# to it gets popped and processed. The point will
# be labelled as a part of this cluster the first time
# it gets popped. We can ignore any duplicate references
# to it that get popped after that.
continue
DEBUG.start_timing('internal_rangequery')
neighbors = leaf.range_query(rec2)
DEBUG.print_timing('internal_rangequery')
if not rec2.is_core_point: # rec2.is_core_point field will be set inside range_query(rec2)
# Label as border point
leaf.label_point(rec2, cluster)
yield dbscan_record(rec2)
continue
queue.append(neighbors)
# Label as core point
leaf.label_point(rec2, cluster)
yield dbscan_record(rec2)
DEBUG.print_timing('internal_per_cluster')
DEBUG.print_timing("dbscan_leaf_internal")
pstats.show("internal")
# Return rows for external records (one for each
# cluster an external point is close to)
DEBUG.start_timing("yield_external_records")
for ext_rec in leaf.yield_external_records():
yield ext_rec
DEBUG.print_timing("yield_external_records")
# Clean up for next time, if there is one
del SD[dist_id]
DEBUG.print_timing("dbscan_leaf")
# The snowflake table is used to reduce the size of the edge table.
# Note that the sole purpose of the edge table is finding the connected
# components. Which means removing some of the edges is fine as long as the
# component is intact.
# We call it snowflake because the end result will look like a point in the
# middle with a bunch of edges coming out of it to the other connected points.
# Example:
# Edges: [1,2] [1,3] [1,4] [2,3] [2,4] [3,1] [3,4] [5,6] [6,7] [7,8] [7,5]
# Result: 1 and 5 are snowflake cores
# Edges: [1,2] [1,3] [1,4] [5,6] [5,7] [5,8]
# This is a proto-wcc operation (creates connected components) but we still
# need to run the actual wcc to combine snowflakes from different segments.
def sf_transition(state, src, dest, n_rows, gp_segment_id, **kwargs):
SD = kwargs['SD']
if not state:
data = []
SD['counter'] = 0
else:
data = SD['data']
counter = SD['counter']
data.append([src,dest])
ret = [[-1,-1],[-1,-1]]
my_n_rows = n_rows[gp_segment_id]
if len(data) == my_n_rows:
cl_ids = {}
clid_counter = 1
cl_counts = {}
for i in data:
key1 = i[0]
key2 = i[1]
cl_id_k1 = cl_ids[key1] if key1 in cl_ids else None
cl_id_k2 = cl_ids[key2] if key2 in cl_ids else None
if not cl_id_k1 and not cl_id_k2:
cl_ids[key1] = clid_counter
cl_ids[key2] = clid_counter
cl_counts[clid_counter] = 2
clid_counter += 1
elif cl_id_k1 and not cl_id_k2:
cl_ids[key2] = cl_id_k1
cl_counts[cl_ids[key1]] += 1
elif not cl_id_k1 and cl_id_k2:
cl_ids[key1] = cl_id_k2
cl_counts[cl_ids[key2]] += 1
else:
if cl_id_k1 != cl_id_k2:
if cl_counts[cl_id_k1] > cl_counts[cl_id_k2]:
for chkey,chvalue in cl_ids.items():
if chvalue == cl_id_k2:
cl_ids[chkey] = cl_id_k1
cl_counts[cl_id_k1] += cl_counts[cl_id_k2]
cl_counts[cl_id_k2] = 0
else:
for chkey,chvalue in cl_ids.items():
if chvalue == cl_id_k1:
cl_ids[chkey] = cl_id_k2
cl_counts[cl_id_k2] += cl_counts[cl_id_k1]
cl_counts[cl_id_k1] = 0
if cl_ids:
running_cl_id = -1
running_sf_center = -1
ret = []
for vertex_id, vertex_cl in sorted(cl_ids.items(), key=lambda item: item[1]):
# Check if we are still in the same snowflake
if vertex_cl != running_cl_id:
running_sf_center = vertex_id
running_cl_id = vertex_cl
else:
ret.append([running_sf_center,vertex_id])
SD['data'] = data
return ret
def sf_merge(state1, state2, **kwargs):
if state1:
return state1
else:
return state2
def sf_final(state, **kwargs):
return state
def _validate_dbscan(schema_madlib, source_table, output_table, id_column,
expr_point, eps, min_samples, metric, algorithm, max_depth):
input_tbl_valid(source_table, 'dbscan')
output_tbl_valid(output_table, 'dbscan')
output_summary_table = add_postfix(output_table, '_summary')
output_tbl_valid(output_summary_table, 'dbscan')
_assert(is_var_valid(source_table, id_column),
"dbscan error: {0} is an invalid column or "
"expression for id param".format(id_column))
id_col_type = get_expr_type(id_column, source_table)
_assert(is_valid_psql_type(id_col_type, INTEGER),
"dbscan Error: unique id column or expression '{0}' in input table must"
" evaluate to an integer type.".format(id_column))
_assert(is_var_valid(source_table, expr_point),
"dbscan error: {0} is an invalid column name or "
"expression for expr_point param".format(expr_point))
point_col_type = get_expr_type(expr_point, source_table)
_assert(is_valid_psql_type(point_col_type, NUMERIC | ONLY_ARRAY),
"dbscan Error: Feature column or expression '{0}' in input table is not"
" a numeric array.".format(expr_point))
_assert(eps > 0, "dbscan Error: eps has to be a positive number")
_assert(min_samples > 0, "dbscan Error: min_samples has to be a positive number")
fn_dist_list = ['dist_norm1', 'dist_norm2', 'squared_dist_norm2',
'dist_angle', 'dist_tanimoto']
_assert(metric in fn_dist_list,
"dbscan Error: metric has to be one of the madlib defined distance functions")
_assert(algorithm == METHOD_BRUTE_FORCE or RTREE_ENABLED == 1,
"dbscan Error: Cannot use rtree without the necessary python module: rtree")
_assert(max_depth is None or max_depth >= 0,
"dbscan Error: max_segmentation_depth must be a non-negative integer or NULL")
def _validate_dbscan_predict(schema_madlib, dbscan_table, source_table,
id_column, expr_point, output_table):
input_tbl_valid(source_table, 'dbscan')
input_tbl_valid(dbscan_table, 'dbscan')
dbscan_summary_table = add_postfix(dbscan_table, '_summary')
input_tbl_valid(dbscan_summary_table, 'dbscan')
output_tbl_valid(output_table, 'dbscan')
_assert(is_var_valid(source_table, id_column),
"dbscan error: {0} is an invalid column name or "
"expression for id param".format(id_column))
id_col_type = get_expr_type(id_column, source_table)
_assert(is_valid_psql_type(id_col_type, INTEGER),
"dbscan Error: unique id column or expression '{0}' in input table must"
" evaluate to an integer type.".format(id_column))
_assert(is_var_valid(source_table, expr_point),
"dbscan error: {0} is an invalid column name or "
"expression for expr_point param".format(expr_point))
point_col_type = get_expr_type(expr_point, source_table)
_assert(is_valid_psql_type(point_col_type, NUMERIC | ONLY_ARRAY),
"dbscan Error: Feature column or expression '{0}' in train table is not"
" a numeric array.".format(expr_point))
def dbscan_help(schema_madlib, message=None, **kwargs):
"""
Help function for dbscan
Args:
@param schema_madlib
@param message: string, Help message string
@param kwargs
Returns:
String. Help/usage information
"""
if message is not None and \
message.lower() in ("usage", "help", "?"):
help_string = """
-----------------------------------------------------------------------
USAGE
-----------------------------------------------------------------------
SELECT {schema_madlib}.dbscan(
source_table, -- Name of the training data table
output_table, -- Name of the output table
id_column, -- A column or expression of columns specifying a unique id for rows in the source_table
expr_point, -- Column name or expression for data points
eps, -- The minimum radius of a cluster
min_samples, -- The minimum size of a cluster
metric, -- The name of the function to use to calculate the
-- distance
algorithm, -- The algorithm to use for dbscan: brute_force or optimized (default=optimized)
max_segmentation_depth -- This has no effect on method=brute. For method=rtree, this
limits the number of splits to be considered while dividing
the dataset into smaller spacial regions for optimal parallel performance.
The default ( NULL or ommitted ) value means it will be unlimited,
but note that it will never break the data up into more regions than
segments in a greenplum cluster, and in some cases may choose fewer.
Setting this to non-NULL will decrease up-front overhead, and can
be useful if you expect to find only a small number of densely populated
clusters, or if the dataset itself is small enough where initial planning
overhead matters more than scaling performance. This parameter will
not affect the output, only the performance.
);
-----------------------------------------------------------------------
OUTPUT
-----------------------------------------------------------------------
The output of the dbscan function is a table with the following columns:
id The id of each point which has been assigned to a cluster
cluster_id The cluster_id's assigned to each point
is_core_point Boolean column that indicates if the point is core or not
point The coordinates of each classified point
"""
else:
help_string = """
----------------------------------------------------------------------------
SUMMARY
----------------------------------------------------------------------------
DBSCAN is a density-based clustering algorithm. Given a set of points in
some space, it groups together points that are closely packed together
(points with many nearby neighbors), marking as outliers points that lie
alone in low-density regions (whose nearest neighbors are too far away).
--
For an overview on usage, run:
SELECT {schema_madlib}.dbscan('usage');
SELECT {schema_madlib}.dbscan_predict('usage');
"""
return help_string.format(schema_madlib=schema_madlib)
# ------------------------------------------------------------------------------
def dbscan_predict_help(schema_madlib, message=None, **kwargs):
"""
Help function for dbscan
Args:
@param schema_madlib
@param message: string, Help message string
@param kwargs
Returns:
String. Help/usage information
"""
if message is not None and \
message.lower() in ("usage", "help", "?"):
help_string = """
-----------------------------------------------------------------------
USAGE
-----------------------------------------------------------------------
SELECT {schema_madlib}.dbscan_predict(
dbscan_table, -- Name of the tdbscan output table
new_point -- Double precision array representing the point
-- for prediction
);
-----------------------------------------------------------------------
OUTPUT
-----------------------------------------------------------------------
The output of the dbscan_predict is an integer indicating the cluster_id
of given point
"""
else:
help_string = """
----------------------------------------------------------------------------
SUMMARY
----------------------------------------------------------------------------
DBSCAN is a density-based clustering algorithm. Given a set of points in
some space, it groups together points that are closely packed together
(points with many nearby neighbors), marking as outliers points that lie
alone in low-density regions (whose nearest neighbors are too far away).
--
For an overview on usage, run:
SELECT {schema_madlib}.dbscan('usage');
SELECT {schema_madlib}.dbscan_predict('usage');
"""
return help_string.format(schema_madlib=schema_madlib)
# ------------------------------------------------------------------------------