blob: af2ecafcdcdfdc7ee1aef3cfdc3f1648d7d5e0d1 [file] [log] [blame]
#-------------------------------------------------------------
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
#-------------------------------------------------------------
# Builtin for deduplication using distributed representations (DRs) and
# locality-sensitive hashing (LSH) based blocking.
#
# The function encodes each input tuple as a dense vector using pre-trained GloVe embeddings (simple averaging),
# groups semantically similar tuples via LSH into buckets, and compares only those pairs for deduplication.
#
#
# INPUT:
# --------------------------------------------------------------------------------------
# X Input Frame[String] with n rows and d columns (raw tuples)
# gloveMatrix Matrix[Double] of size |V| × e (pretrained GloVe embeddings) -> |V| number of words and e = embedding dimesnion
# vocab Frame[String] of size |V| × 1 (vocabulary aligned with gloveMatrix)
# similarityMeasure (optional) String specifying similarity metric: "cosine", "euclidean"
# threshold (optional) Double: threshold value above which tuples are considered duplicates
# --------------------------------------------------------------------------------------
#
# OUTPUT:
# --------------------------------------------------------------------------------------
# Y_unique Frame[String] with deduplicated tuples
# (first occurrence of each duplicate group is retained)
# Y_duplicates Frame[String] with all detected duplicates
# (i.e., tuples removed from the input)
# --------------------------------------------------------------------------------------
f_dedup = function(Frame[String] X, Matrix[Double] gloveMatrix, Frame[String] vocab, String similarityMeasure = "cosine", Double threshold = 0.8)
return(Frame[String] Y_unique, Frame[String] Y_duplicates)
{
# Step 1: Distributed Representation (DRs)
V = computeDRMatrix(X, vocab, gloveMatrix)
# Step 2: generate LSH Hyperplanes
K = 10 # number of hash functions
d = ncol(V)
H = rand(rows=K, cols=d, pdf="uniform", seed=-1)
# Step 3: Compute LSH Hashcodes
hashCodes = computeLSH(V, H)
# Step 4: Form Buckets
bucketIDs = formBuckets(hashCodes)
# Step 5: Candidate Pair Generation
pairs = findCandidatePairs(bucketIDs)
# Step 6: Compute Similarity for Pairs
sim = computeSimilarity(V, pairs, similarityMeasure)
# Step 7: Filter Duplicates
matches = filterDuplicates(pairs, sim, threshold)
# Step 8: Extract duplicate indices
rows = nrow(matches)
tmp1 = ifelse(rows > 0, matches[1:rows, 1:1], matrix(0, rows=0, cols=1))
tmp2 = ifelse(rows > 0, matches[1:rows, 2:2], matrix(0, rows=0, cols=1))
allDupIDs = rbind(tmp1, tmp2)
allDupIDs = ifelse(nrow(allDupIDs) > 0, unique(allDupIDs), matrix(0, rows=0, cols=1))
# Step 9: Keep the first index, remove all others
keepMask = matrix(1, rows=nrow(X), cols=1)
if (nrow(allDupIDs) > 0) {
# Find the first index (minimum) among the duplicates
minIdx = min(allDupIDs)
for (i in 1:nrow(allDupIDs)) {
idx = as.scalar(allDupIDs[i,1])
if (idx != minIdx) {
keepMask[idx,1] = 0
}
}
}
# extract IDs from keepMask
keepIDs = matrix(0, rows=0, cols=1)
dupIndices = matrix(0, rows=0, cols=1)
for (i in 1:nrow(keepMask)) {
if (as.scalar(keepMask[i,1]) == 1) {
keepIDs = rbind(keepIDs, matrix(i,1,1))
} else {
dupIndices = rbind(dupIndices, matrix(i,1,1))
}
}
# Step 10: Extract duplicates and unique rows from X
Y_duplicates = removeEmpty(target=X[1,], margin="rows")
Y_unique = removeEmpty(target=X[1,], margin="rows")
if (nrow(dupIndices) > 0) {
for (i in 1:nrow(dupIndices)) {
id = as.scalar(dupIndices[i, 1])
row = X[id, ]
Y_duplicates = rbind(Y_duplicates, row)
}
}
if (nrow(keepIDs) > 0) {
for (i in 1:nrow(keepIDs)) {
id = as.scalar(keepIDs[i, 1])
row = X[id, ]
Y_unique = rbind(Y_unique, row)
}
}
}
computeDRMatrix = function(Frame[String] X, Frame[String] vocab, Matrix[Double] gloveMatrix)
return(Matrix[Double] V)
{
# TODO: Vectorize this implementation with dedicated transform incode permutation matrices
n = nrow(X)
d = ncol(gloveMatrix)
V = matrix(0, rows=n, cols=d) # define output matrix
for (i in 1:n) {
row = X[i,]
words = transformapply(row, "UtilFunctions.cleanAndTokenizeRow")
sumVec = matrix(0, rows=1, cols=d)
count = 0
for (k in 1:length(words)) {
w = words[k]
idx = -1
found = FALSE
# search for word in vocabulary
for (m in 1:nrow(vocab)) {
if (!found & vocab[m,1] == w) {
idx = m
found = TRUE
}
}
# word found
if (idx > 0) {
sumVec = sumVec + gloveMatrix[idx,]
count = count + 1
}
}
if (count > 0) {
V[i,] = sumVec / count
}
else {
V[i,] = sumVec
}
}
}
computeLSH = function(Matrix[Double] V, Matrix[Double] H)
return(Matrix[Double] hashCodes)
{
# matrix multiplication: projection of each DR vector on hyperplanes
P = V %*% t(H)
# compare elementwise
hashCodes = (P >= 0) # returns 1 for true, 0 for false
}
formBuckets = function(Matrix[Double] hashCodes)
return(Matrix[Double] bucketIDs)
{
# TODO vectorize
n = nrow(hashCodes)
K = ncol(hashCodes)
# generate binary weighting vector (e.g. 2^n-1, ..., 2^0)
powers = matrix(0, rows=1, cols=K)
for (k in 1:K) {
powers[1, k] = 2^(K-k)
}
# generate Bucket-IDs
bucketIDs = hashCodes %*% t(powers)
}
findCandidatePairs = function(Matrix[Double] bucketIDs)
return(Matrix[Double] pairs)
{
n = nrow(bucketIDs)
pairs = matrix(0, rows=0, cols=2)
# O(n^2)-Vergleich TODO: ggf. mit Java verbessern
for (i in 1:(n - 1)) {
for (j in (i + 1):n) {
if (as.scalar(bucketIDs[i,1]) == as.scalar(bucketIDs[j,1])) {
pairs = rbind(pairs, matrix([i, j], rows=1, cols=2))
}
}
}
}
computeSimilarity = function(Matrix[Double] V, Matrix[Double] pairs, String similarityMeasure)
return(Matrix[Double] similarities)
{
m = nrow(pairs)
d = ncol(V)
similarities = matrix(0.0, rows=m, cols=1)
for (k in 1:m) {
i = as.scalar(pairs[k,1])
j = as.scalar(pairs[k,2])
vi = V[i,] # Vektor i
vj = V[j,] # Vektor j
if (similarityMeasure == "cosine") {
dot = sum(vi * vj)
norm_i = sqrt(sum(vi^2))
norm_j = sqrt(sum(vj^2))
sim = dot / (norm_i * norm_j)
}
else if (similarityMeasure == "euclidean") {
diff = vi - vj
sim = -1 * sqrt(sum(diff^2))
}
else {
stop("Unsupported similarity measure: " + similarityMeasure)
}
similarities[k,1] = sim
}
}
filterDuplicates = function(Matrix[Double] pairs, Matrix[Double] similarities, Double threshold)
return(Matrix[Double] matches)
{
m = nrow(pairs)
matches = matrix(0, rows=0, cols=2)
for (i in 1:m) {
sim = similarities[i,1]
if (sim >= threshold) {
row = matrix(pairs[i,], rows=1, cols=2) #row = pairs[i,]
matches = rbind(matches, row)
}
}
}