| #------------------------------------------------------------- |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| #------------------------------------------------------------- |
| |
| source("./scripts/staging/entity-resolution/primitives/blocking.dml") as block; |
| source("./scripts/staging/entity-resolution/primitives/matching.dml") as match; |
| source("./scripts/staging/entity-resolution/primitives/clustering.dml") as cluster; |
| |
| |
| # Very simple entity clustering pipeline which should work relatively well for small datasets. |
| # |
| # Blocks the input dataset X into num_blocks non-overlapping regions, after sorting the dataset |
| # by the sum of its rows. This is a very simple blocking scheme which serves mainly as a baseline |
| # example and will result in non-optimal performance. However, if no blocking is needed, this |
| # can be used with num_blocks=1. |
| # |
| # Uses a threshold for similarity to link entities and clusters them by also connecting all |
| # entities in each connected compontent ('makes each connected component fully connected'). |
| # |
| # INPUT PARAMETERS: |
| # -------------------------------------------------------------------------------------------- |
| # NAME TYPE DEFAULT MEANING |
| # -------------------------------------------------------------------------------------------- |
| # X matrix --- A dataset for which duplicates should be found. |
| # num_blocks Integer --- How many blocks to produce. |
| # threshold Double --- Similarity threshold which is used to decide if two entities |
| # are duplicates. |
| # Output: |
| # -------------------------------------------------------------------------------------------- |
| # NAME TYPE MEANING |
| # -------------------------------------------------------------------------------------------- |
| # X_cluster matrix A symmetrical adjacency matrix for X defining the found duplicates. |
| # -------------------------------------------------------------------------------------------- |
| entity_clustering_pipeline = function(Matrix[Double] X, Integer num_blocks, Double threshold) return (Matrix[Double] X_cluster) { |
| # First sort the matrix |
| [X_index, X_sorted] = block::row_sum_sorting(X); |
| X = X_sorted; |
| # Perform blocking: match and cluster each block |
| blocks = block::naive_blocking(X_sorted, num_blocks); |
| X_cluster = matrix(0, nrow(X_sorted), nrow(X_sorted)); |
| # system ds raises false positives for dependency check, but chunks of matrix are indepedantly addresses |
| # supress error with check = 0 |
| parfor (i in 1:nrow(blocks)-1, check = 0) { |
| block_start = as.scalar(blocks[i,]); |
| block_end = as.scalar(blocks[i+1,])-1; |
| X_block = X_sorted[block_start:block_end,]; |
| X_sim = match::cosine(X_block); |
| X_thres = match::tresholding(X_sim, threshold); |
| X_match = (X_sim > threshold); |
| X_comp = cluster::cluster_by_connected_components(X_match); |
| X_cluster[block_start:block_end,block_start:block_end] = X_comp * X_sim; |
| } |
| # Reindex back the symmetrical matrix |
| X_cluster = block::reindex_rows_and_cols(X_cluster, X_index); |
| } |
| |
| # Entity clustering pipeline using locality-sensitive hashing as a blocking algorithm to improve |
| # runtime on large datasets. The tradeoff between accuracy and performance can be configured |
| # via the num_hashtables and num_hyperplanes parameters. |
| # |
| # For more details on LSH, see: |
| # Ebraheem, Muhammad, et al. "Distributed representations of tuples for entity resolution." |
| # Proceedings of the VLDB Endowment 11.11 (2018): 1454-1467. |
| # |
| # Uses a threshold for similarity to link entities and clusters them by also connecting all |
| # entities in each connected compontent ('makes each connected component fully connected'). |
| # |
| # INPUT PARAMETERS: |
| # -------------------------------------------------------------------------------------------- |
| # NAME TYPE DEFAULT MEANING |
| # -------------------------------------------------------------------------------------------- |
| # X matrix --- A dataset for which duplicates should be found. |
| # num_hashtables Integer --- How often to block the dataset using random hyperplanes and |
| # compute similarities. |
| # Increases runtime and improves accuracy. |
| # num_hyperplanes Integer --- The dimensionality of the random hyperplanes. |
| # Higher values produce smaller blocks and require a higher |
| # number of num_hashtables to avoid losing accuracy. |
| # threshold Double --- Similarity threshold which is used to decide if two entities |
| # are duplicates. |
| # Output: |
| # -------------------------------------------------------------------------------------------- |
| # NAME TYPE MEANING |
| # -------------------------------------------------------------------------------------------- |
| # X_cluster matrix A symmetrical adjacency matrix for X defining the found duplicates. |
| # -------------------------------------------------------------------------------------------- |
| entity_clustering_pipeline_lsh = function(Matrix[Double] X, Integer num_hashtables, Integer num_hyperplanes, Double threshold) return (Matrix[Double] X_cluster) { |
| X_cluster = matrix(0, nrow(X), nrow(X)); |
| # First get the LSH blocks |
| for (hashtable in 1:num_hashtables, check = 0) { |
| [X_index, X_hash, X_sorted, blocks] = block::lsh_blocking(X, num_hyperplanes); |
| X_cluster_local = matrix(0, nrow(X), nrow(X)); |
| ## Perform blocking: match and cluster each block |
| parfor (i in 1:nrow(blocks)-1, check = 0) { |
| block_start = as.scalar(blocks[i,]); |
| block_end = as.scalar(blocks[i+1,])-1; |
| # Only apply to existing blocks |
| X_block = X_sorted[block_start:block_end,]; |
| X_sim = match::cosine(X_block); |
| X_thres = match::tresholding(X_sim, threshold); |
| X_match = (X_sim > threshold); |
| X_comp = cluster::cluster_by_connected_components(X_match); |
| X_new_block = X_cluster_local[block_start:block_end,block_start:block_end] | (X_comp * X_sim); |
| # Workaround for a bug where assigning a sparse matrix multiple times leads to an error in the SystemDS runtime. |
| if (sum(X_new_block) > 0) { |
| X_cluster_local[block_start:block_end,block_start:block_end] = X_new_block; |
| } |
| } |
| ## Reindex back the symmetrical matrix |
| X_cluster = X_cluster | block::reindex_rows_and_cols(X_cluster_local, X_index); |
| } |
| } |
| |
| # Very simple binary entity resolution pipeline which computes similarity between rows of two |
| # datasets. This mainly serves as a baseline example and does not use blocking. It is not |
| # suitable for large datasets. |
| # |
| # Uses a threshold for similarity to link entities. |
| # |
| # INPUT PARAMETERS: |
| # -------------------------------------------------------------------------------------------- |
| # NAME TYPE DEFAULT MEANING |
| # -------------------------------------------------------------------------------------------- |
| # X matrix --- A dataset to be compared with Y. |
| # Y matrix --- A dataset to be compared with X. |
| # threshold Double --- Similarity threshold which is used to decide if two |
| # entities are duplicates. |
| # Output: |
| # -------------------------------------------------------------------------------------------- |
| # NAME TYPE MEANING |
| # -------------------------------------------------------------------------------------------- |
| # XY_pairs matrix An adjacency matrix defining the found duplicates between X and Y. |
| # Shape is (nrow(X), nrow(Y)). |
| # -------------------------------------------------------------------------------------------- |
| binary_entity_resolution_pipeline = function(Matrix[Double] X, Matrix[Double] Y, Double threshold) return (Matrix[Double] XY_pairs) { |
| XY_sim = match::cosine2(X, Y); |
| XY_pairs = match::tresholding(XY_sim, threshold); |
| } |
| |
| # Binary entity resolution pipeline using locality-sensitive hashing as a blocking algorithm |
| # to improve runtime on large datasets. |
| # |
| # The tradeoff between accuracy and performance can be configured |
| # via the num_hashtables and num_hyperplanes parameters. |
| # |
| # For more details on LSH, see: |
| # Ebraheem, Muhammad, et al. "Distributed representations of tuples for entity resolution." |
| # Proceedings of the VLDB Endowment 11.11 (2018): 1454-1467. |
| # |
| # Uses a threshold for similarity to link entities. |
| # |
| # INPUT PARAMETERS: |
| # -------------------------------------------------------------------------------------------- |
| # NAME TYPE DEFAULT MEANING |
| # -------------------------------------------------------------------------------------------- |
| # X matrix --- A dataset to be compared with Y. |
| # Y matrix --- A dataset to be compared with X. |
| # num_hashtables Integer --- How often to block the dataset using random hyperplanes and |
| # compute similarities. |
| # Increases runtime and improves accuracy. |
| # num_hyperplanes Integer --- The dimensionality of the random hyperplanes. |
| # Higher values produce smaller blocks and require a higher |
| # number of num_hashtables to avoid losing accuracy. |
| # threshold Double --- Similarity threshold which is used to decide if two |
| # entities are duplicates. |
| # Output: |
| # -------------------------------------------------------------------------------------------- |
| # NAME TYPE MEANING |
| # -------------------------------------------------------------------------------------------- |
| # XY_pairs matrix An adjacency matrix defining the found duplicates between X and Y. |
| # Shape is (nrow(X), nrow(Y)). |
| # -------------------------------------------------------------------------------------------- |
| binary_entity_resolution_pipeline_lsh = function(Matrix[Double] X, Matrix[Double] Y, Integer num_hashtables, Integer num_hyperplanes, Double threshold) return (Matrix[Double] XY_pairs) { |
| XY_pairs = matrix(0, nrow(X), nrow(Y)); |
| for (hashtable in 1:num_hashtables, check = 0) { |
| [X_index, X_hash, X_sorted, Y_index, Y_hash, Y_sorted, blocks] = block::lsh_blocking2(X, Y, num_hyperplanes); |
| XY_pairs_local = matrix(0, nrow(X), nrow(Y)); |
| parfor (i in 1:nrow(blocks)-1, check = 0) { |
| block_start = blocks[i,]; |
| block_end = blocks[i+1,]; |
| x_start = as.scalar(block_start[1,1]); |
| x_end = as.scalar(block_end[1,1]) - 1; |
| |
| y_start = as.scalar(block_start[1,2]); |
| y_end = as.scalar(block_end[1,2]) - 1; |
| if ((x_end - x_start + 1) > 0 & (y_end - y_start + 1) > 0) { |
| X_block = X_sorted[x_start:x_end,]; |
| Y_block = Y_sorted[y_start:y_end,]; |
| |
| XY_sim = match::cosine2(X_block, Y_block); |
| XY_pairs_block = match::tresholding(XY_sim, threshold); |
| # Workaround for a bug where assigning a sparse matrix multiple times leads to an error in the SystemDS runtime. |
| if (sum(XY_pairs_block) > 0) { |
| XY_pairs_local[x_start:x_end,y_start:y_end] = XY_pairs_block; |
| } |
| } |
| } |
| XY_pairs_local = block::reindex_rowwise(XY_pairs_local, X_index); |
| XY_pairs_local = t(XY_pairs_local); |
| XY_pairs_local = block::reindex_rowwise(XY_pairs_local, Y_index); |
| XY_pairs = XY_pairs | t(XY_pairs_local); |
| } |
| } |