| #------------------------------------------------------------- |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| #------------------------------------------------------------- |
| |
| # Builtin for deduplication using distributed representations (DRs) and |
| # locality-sensitive hashing (LSH) based blocking. |
| # |
| # The function encodes each input tuple as a dense vector using pre-trained GloVe embeddings (simple averaging), |
| # groups semantically similar tuples via LSH into buckets, and compares only those pairs for deduplication. |
| # |
| # |
| # INPUT: |
| # -------------------------------------------------------------------------------------- |
| # X Input Frame[String] with n rows and d columns (raw tuples) |
| # gloveMatrix Matrix[Double] of size |V| × e (pretrained GloVe embeddings) -> |V| number of words and e = embedding dimesnion |
| # vocab Frame[String] of size |V| × 1 (vocabulary aligned with gloveMatrix) |
| # similarityMeasure (optional) String specifying similarity metric: "cosine", "euclidean" |
| # threshold (optional) Double: threshold value above which tuples are considered duplicates |
| # -------------------------------------------------------------------------------------- |
| # |
| # OUTPUT: |
| # -------------------------------------------------------------------------------------- |
| # Y_unique Frame[String] with deduplicated tuples |
| # (first occurrence of each duplicate group is retained) |
| # Y_duplicates Frame[String] with all detected duplicates |
| # (i.e., tuples removed from the input) |
| # -------------------------------------------------------------------------------------- |
| |
| f_dedup = function(Frame[String] X, Matrix[Double] gloveMatrix, Frame[String] vocab, String similarityMeasure = "cosine", Double threshold = 0.8) |
| return(Frame[String] Y_unique, Frame[String] Y_duplicates) |
| { |
| # Step 1: Distributed Representation (DRs) |
| V = computeDRMatrix(X, vocab, gloveMatrix) |
| |
| # Step 2: generate LSH Hyperplanes |
| K = 10 # number of hash functions |
| d = ncol(V) |
| H = rand(rows=K, cols=d, pdf="uniform", seed=-1) |
| |
| # Step 3: Compute LSH Hashcodes |
| hashCodes = computeLSH(V, H) |
| |
| # Step 4: Form Buckets |
| bucketIDs = formBuckets(hashCodes) |
| |
| # Step 5: Candidate Pair Generation |
| pairs = findCandidatePairs(bucketIDs) |
| |
| # Step 6: Compute Similarity for Pairs |
| sim = computeSimilarity(V, pairs, similarityMeasure) |
| |
| # Step 7: Filter Duplicates |
| matches = filterDuplicates(pairs, sim, threshold) |
| |
| # Step 8: Extract duplicate indices |
| rows = nrow(matches) |
| |
| tmp1 = ifelse(rows > 0, matches[1:rows, 1:1], matrix(0, rows=0, cols=1)) |
| tmp2 = ifelse(rows > 0, matches[1:rows, 2:2], matrix(0, rows=0, cols=1)) |
| allDupIDs = rbind(tmp1, tmp2) |
| allDupIDs = ifelse(nrow(allDupIDs) > 0, unique(allDupIDs), matrix(0, rows=0, cols=1)) |
| |
| # Step 9: Keep the first index, remove all others |
| keepMask = matrix(1, rows=nrow(X), cols=1) |
| |
| if (nrow(allDupIDs) > 0) { |
| # Find the first index (minimum) among the duplicates |
| minIdx = min(allDupIDs) |
| for (i in 1:nrow(allDupIDs)) { |
| idx = as.scalar(allDupIDs[i,1]) |
| if (idx != minIdx) { |
| keepMask[idx,1] = 0 |
| } |
| } |
| } |
| |
| # extract IDs from keepMask |
| keepIDs = matrix(0, rows=0, cols=1) |
| dupIndices = matrix(0, rows=0, cols=1) |
| for (i in 1:nrow(keepMask)) { |
| if (as.scalar(keepMask[i,1]) == 1) { |
| keepIDs = rbind(keepIDs, matrix(i,1,1)) |
| } else { |
| dupIndices = rbind(dupIndices, matrix(i,1,1)) |
| } |
| } |
| |
| # Step 10: Extract duplicates and unique rows from X |
| Y_duplicates = removeEmpty(target=X[1,], margin="rows") |
| Y_unique = removeEmpty(target=X[1,], margin="rows") |
| |
| if (nrow(dupIndices) > 0) { |
| for (i in 1:nrow(dupIndices)) { |
| id = as.scalar(dupIndices[i, 1]) |
| row = X[id, ] |
| Y_duplicates = rbind(Y_duplicates, row) |
| } |
| } |
| if (nrow(keepIDs) > 0) { |
| for (i in 1:nrow(keepIDs)) { |
| id = as.scalar(keepIDs[i, 1]) |
| row = X[id, ] |
| Y_unique = rbind(Y_unique, row) |
| } |
| } |
| } |
| |
| computeDRMatrix = function(Frame[String] X, Frame[String] vocab, Matrix[Double] gloveMatrix) |
| return(Matrix[Double] V) |
| { |
| # TODO: Vectorize this implementation with dedicated transform incode permutation matrices |
| n = nrow(X) |
| d = ncol(gloveMatrix) |
| V = matrix(0, rows=n, cols=d) # define output matrix |
| |
| for (i in 1:n) { |
| row = X[i,] |
| words = transformapply(row, "UtilFunctions.cleanAndTokenizeRow") |
| |
| sumVec = matrix(0, rows=1, cols=d) |
| count = 0 |
| |
| for (k in 1:length(words)) { |
| w = words[k] |
| idx = -1 |
| found = FALSE |
| |
| # search for word in vocabulary |
| for (m in 1:nrow(vocab)) { |
| if (!found & vocab[m,1] == w) { |
| idx = m |
| found = TRUE |
| } |
| } |
| # word found |
| if (idx > 0) { |
| sumVec = sumVec + gloveMatrix[idx,] |
| count = count + 1 |
| } |
| } |
| if (count > 0) { |
| V[i,] = sumVec / count |
| } |
| else { |
| V[i,] = sumVec |
| } |
| } |
| } |
| |
| computeLSH = function(Matrix[Double] V, Matrix[Double] H) |
| return(Matrix[Double] hashCodes) |
| { |
| # matrix multiplication: projection of each DR vector on hyperplanes |
| P = V %*% t(H) |
| |
| # compare elementwise |
| hashCodes = (P >= 0) # returns 1 for true, 0 for false |
| } |
| |
| formBuckets = function(Matrix[Double] hashCodes) |
| return(Matrix[Double] bucketIDs) |
| { |
| # TODO vectorize |
| n = nrow(hashCodes) |
| K = ncol(hashCodes) |
| |
| # generate binary weighting vector (e.g. 2^n-1, ..., 2^0) |
| powers = matrix(0, rows=1, cols=K) |
| for (k in 1:K) { |
| powers[1, k] = 2^(K-k) |
| } |
| |
| # generate Bucket-IDs |
| bucketIDs = hashCodes %*% t(powers) |
| } |
| |
| findCandidatePairs = function(Matrix[Double] bucketIDs) |
| return(Matrix[Double] pairs) |
| { |
| n = nrow(bucketIDs) |
| pairs = matrix(0, rows=0, cols=2) |
| |
| # O(n^2)-Vergleich TODO: ggf. mit Java verbessern |
| for (i in 1:(n - 1)) { |
| for (j in (i + 1):n) { |
| if (as.scalar(bucketIDs[i,1]) == as.scalar(bucketIDs[j,1])) { |
| pairs = rbind(pairs, matrix([i, j], rows=1, cols=2)) |
| } |
| } |
| } |
| } |
| |
| computeSimilarity = function(Matrix[Double] V, Matrix[Double] pairs, String similarityMeasure) |
| return(Matrix[Double] similarities) |
| { |
| m = nrow(pairs) |
| d = ncol(V) |
| similarities = matrix(0.0, rows=m, cols=1) |
| |
| for (k in 1:m) { |
| i = as.scalar(pairs[k,1]) |
| j = as.scalar(pairs[k,2]) |
| |
| vi = V[i,] # Vektor i |
| vj = V[j,] # Vektor j |
| |
| if (similarityMeasure == "cosine") { |
| dot = sum(vi * vj) |
| norm_i = sqrt(sum(vi^2)) |
| norm_j = sqrt(sum(vj^2)) |
| sim = dot / (norm_i * norm_j) |
| } |
| else if (similarityMeasure == "euclidean") { |
| diff = vi - vj |
| sim = -1 * sqrt(sum(diff^2)) |
| } |
| else { |
| stop("Unsupported similarity measure: " + similarityMeasure) |
| } |
| |
| similarities[k,1] = sim |
| } |
| } |
| |
| filterDuplicates = function(Matrix[Double] pairs, Matrix[Double] similarities, Double threshold) |
| return(Matrix[Double] matches) |
| { |
| m = nrow(pairs) |
| matches = matrix(0, rows=0, cols=2) |
| |
| for (i in 1:m) { |
| sim = similarities[i,1] |
| |
| if (sim >= threshold) { |
| row = matrix(pairs[i,], rows=1, cols=2) #row = pairs[i,] |
| matches = rbind(matches, row) |
| } |
| } |
| } |