blob: e6af155f0f08965aaba8edfa91f7acb7c902c3ba [file] [log] [blame]
#-------------------------------------------------------------
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
#-------------------------------------------------------------
source("./scripts/staging/entity-resolution/primitives/blocking.dml") as block;
source("./scripts/staging/entity-resolution/primitives/matching.dml") as match;
source("./scripts/staging/entity-resolution/primitives/clustering.dml") as cluster;
# Very simple entity clustering pipeline which should work relatively well for small datasets.
#
# Blocks the input dataset X into num_blocks non-overlapping regions, after sorting the dataset
# by the sum of its rows. This is a very simple blocking scheme which serves mainly as a baseline
# example and will result in non-optimal performance. However, if no blocking is needed, this
# can be used with num_blocks=1.
#
# Uses a threshold for similarity to link entities and clusters them by also connecting all
# entities in each connected compontent ('makes each connected component fully connected').
#
# INPUT PARAMETERS:
# --------------------------------------------------------------------------------------------
# NAME TYPE DEFAULT MEANING
# --------------------------------------------------------------------------------------------
# X matrix --- A dataset for which duplicates should be found.
# num_blocks Integer --- How many blocks to produce.
# threshold Double --- Similarity threshold which is used to decide if two entities
# are duplicates.
# Output:
# --------------------------------------------------------------------------------------------
# NAME TYPE MEANING
# --------------------------------------------------------------------------------------------
# X_cluster matrix A symmetrical adjacency matrix for X defining the found duplicates.
# --------------------------------------------------------------------------------------------
entity_clustering_pipeline = function(Matrix[Double] X, Integer num_blocks, Double threshold) return (Matrix[Double] X_cluster) {
# First sort the matrix
[X_index, X_sorted] = block::row_sum_sorting(X);
X = X_sorted;
# Perform blocking: match and cluster each block
blocks = block::naive_blocking(X_sorted, num_blocks);
X_cluster = matrix(0, nrow(X_sorted), nrow(X_sorted));
# system ds raises false positives for dependency check, but chunks of matrix are indepedantly addresses
# supress error with check = 0
parfor (i in 1:nrow(blocks)-1, check = 0) {
block_start = as.scalar(blocks[i,]);
block_end = as.scalar(blocks[i+1,])-1;
X_block = X_sorted[block_start:block_end,];
X_sim = match::cosine(X_block);
X_thres = match::tresholding(X_sim, threshold);
X_match = (X_sim > threshold);
X_comp = cluster::cluster_by_connected_components(X_match);
X_cluster[block_start:block_end,block_start:block_end] = X_comp * X_sim;
}
# Reindex back the symmetrical matrix
X_cluster = block::reindex_rows_and_cols(X_cluster, X_index);
}
# Entity clustering pipeline using locality-sensitive hashing as a blocking algorithm to improve
# runtime on large datasets. The tradeoff between accuracy and performance can be configured
# via the num_hashtables and num_hyperplanes parameters.
#
# For more details on LSH, see:
# Ebraheem, Muhammad, et al. "Distributed representations of tuples for entity resolution."
# Proceedings of the VLDB Endowment 11.11 (2018): 1454-1467.
#
# Uses a threshold for similarity to link entities and clusters them by also connecting all
# entities in each connected compontent ('makes each connected component fully connected').
#
# INPUT PARAMETERS:
# --------------------------------------------------------------------------------------------
# NAME TYPE DEFAULT MEANING
# --------------------------------------------------------------------------------------------
# X matrix --- A dataset for which duplicates should be found.
# num_hashtables Integer --- How often to block the dataset using random hyperplanes and
# compute similarities.
# Increases runtime and improves accuracy.
# num_hyperplanes Integer --- The dimensionality of the random hyperplanes.
# Higher values produce smaller blocks and require a higher
# number of num_hashtables to avoid losing accuracy.
# threshold Double --- Similarity threshold which is used to decide if two entities
# are duplicates.
# Output:
# --------------------------------------------------------------------------------------------
# NAME TYPE MEANING
# --------------------------------------------------------------------------------------------
# X_cluster matrix A symmetrical adjacency matrix for X defining the found duplicates.
# --------------------------------------------------------------------------------------------
entity_clustering_pipeline_lsh = function(Matrix[Double] X, Integer num_hashtables, Integer num_hyperplanes, Double threshold) return (Matrix[Double] X_cluster) {
X_cluster = matrix(0, nrow(X), nrow(X));
# First get the LSH blocks
for (hashtable in 1:num_hashtables, check = 0) {
[X_index, X_hash, X_sorted, blocks] = block::lsh_blocking(X, num_hyperplanes);
X_cluster_local = matrix(0, nrow(X), nrow(X));
## Perform blocking: match and cluster each block
parfor (i in 1:nrow(blocks)-1, check = 0) {
block_start = as.scalar(blocks[i,]);
block_end = as.scalar(blocks[i+1,])-1;
# Only apply to existing blocks
X_block = X_sorted[block_start:block_end,];
X_sim = match::cosine(X_block);
X_thres = match::tresholding(X_sim, threshold);
X_match = (X_sim > threshold);
X_comp = cluster::cluster_by_connected_components(X_match);
X_new_block = X_cluster_local[block_start:block_end,block_start:block_end] | (X_comp * X_sim);
# Workaround for a bug where assigning a sparse matrix multiple times leads to an error in the SystemDS runtime.
if (sum(X_new_block) > 0) {
X_cluster_local[block_start:block_end,block_start:block_end] = X_new_block;
}
}
## Reindex back the symmetrical matrix
X_cluster = X_cluster | block::reindex_rows_and_cols(X_cluster_local, X_index);
}
}
# Very simple binary entity resolution pipeline which computes similarity between rows of two
# datasets. This mainly serves as a baseline example and does not use blocking. It is not
# suitable for large datasets.
#
# Uses a threshold for similarity to link entities.
#
# INPUT PARAMETERS:
# --------------------------------------------------------------------------------------------
# NAME TYPE DEFAULT MEANING
# --------------------------------------------------------------------------------------------
# X matrix --- A dataset to be compared with Y.
# Y matrix --- A dataset to be compared with X.
# threshold Double --- Similarity threshold which is used to decide if two
# entities are duplicates.
# Output:
# --------------------------------------------------------------------------------------------
# NAME TYPE MEANING
# --------------------------------------------------------------------------------------------
# XY_pairs matrix An adjacency matrix defining the found duplicates between X and Y.
# Shape is (nrow(X), nrow(Y)).
# --------------------------------------------------------------------------------------------
binary_entity_resolution_pipeline = function(Matrix[Double] X, Matrix[Double] Y, Double threshold) return (Matrix[Double] XY_pairs) {
XY_sim = match::cosine2(X, Y);
XY_pairs = match::tresholding(XY_sim, threshold);
}
# Binary entity resolution pipeline using locality-sensitive hashing as a blocking algorithm
# to improve runtime on large datasets.
#
# The tradeoff between accuracy and performance can be configured
# via the num_hashtables and num_hyperplanes parameters.
#
# For more details on LSH, see:
# Ebraheem, Muhammad, et al. "Distributed representations of tuples for entity resolution."
# Proceedings of the VLDB Endowment 11.11 (2018): 1454-1467.
#
# Uses a threshold for similarity to link entities.
#
# INPUT PARAMETERS:
# --------------------------------------------------------------------------------------------
# NAME TYPE DEFAULT MEANING
# --------------------------------------------------------------------------------------------
# X matrix --- A dataset to be compared with Y.
# Y matrix --- A dataset to be compared with X.
# num_hashtables Integer --- How often to block the dataset using random hyperplanes and
# compute similarities.
# Increases runtime and improves accuracy.
# num_hyperplanes Integer --- The dimensionality of the random hyperplanes.
# Higher values produce smaller blocks and require a higher
# number of num_hashtables to avoid losing accuracy.
# threshold Double --- Similarity threshold which is used to decide if two
# entities are duplicates.
# Output:
# --------------------------------------------------------------------------------------------
# NAME TYPE MEANING
# --------------------------------------------------------------------------------------------
# XY_pairs matrix An adjacency matrix defining the found duplicates between X and Y.
# Shape is (nrow(X), nrow(Y)).
# --------------------------------------------------------------------------------------------
binary_entity_resolution_pipeline_lsh = function(Matrix[Double] X, Matrix[Double] Y, Integer num_hashtables, Integer num_hyperplanes, Double threshold) return (Matrix[Double] XY_pairs) {
XY_pairs = matrix(0, nrow(X), nrow(Y));
for (hashtable in 1:num_hashtables, check = 0) {
[X_index, X_hash, X_sorted, Y_index, Y_hash, Y_sorted, blocks] = block::lsh_blocking2(X, Y, num_hyperplanes);
XY_pairs_local = matrix(0, nrow(X), nrow(Y));
parfor (i in 1:nrow(blocks)-1, check = 0) {
block_start = blocks[i,];
block_end = blocks[i+1,];
x_start = as.scalar(block_start[1,1]);
x_end = as.scalar(block_end[1,1]) - 1;
y_start = as.scalar(block_start[1,2]);
y_end = as.scalar(block_end[1,2]) - 1;
if ((x_end - x_start + 1) > 0 & (y_end - y_start + 1) > 0) {
X_block = X_sorted[x_start:x_end,];
Y_block = Y_sorted[y_start:y_end,];
XY_sim = match::cosine2(X_block, Y_block);
XY_pairs_block = match::tresholding(XY_sim, threshold);
# Workaround for a bug where assigning a sparse matrix multiple times leads to an error in the SystemDS runtime.
if (sum(XY_pairs_block) > 0) {
XY_pairs_local[x_start:x_end,y_start:y_end] = XY_pairs_block;
}
}
}
XY_pairs_local = block::reindex_rowwise(XY_pairs_local, X_index);
XY_pairs_local = t(XY_pairs_local);
XY_pairs_local = block::reindex_rowwise(XY_pairs_local, Y_index);
XY_pairs = XY_pairs | t(XY_pairs_local);
}
}