blob: 7fa7572a3647b435462c03d07ad1d2b35278074a [file] [log] [blame]
#-------------------------------------------------------------
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
#-------------------------------------------------------------
# This raJoin-function takes two matrix datasets as input from where it performs
# relational operations : join
#
# INPUT:
# ------------------------------------------------------------------------------
# A Matrix of left input data [shape: N x M]
# colA Integer indicating the column index of matrix A to execute inner join command
# B Matrix of right left data [shape: N x M]
# colA Integer indicating the column index of matrix B to execute inner join command
# method Join implementation method (nested-loop, sort-merge, hash, hash2)
# ------------------------------------------------------------------------------
#
# OUTPUT:
# ------------------------------------------------------------------------------
# Y Matrix of joined data [shape N' x M] with N' <= N
# ------------------------------------------------------------------------------
m_raJoin = function (Matrix[Double] A, Integer colA, Matrix[Double] B,
Integer colB, String method)
return (Matrix[Double] Y)
{
# Sort the input Matrix with specific column in order to ensure same output order
A = order(target = A, by = colA, decreasing=FALSE, index.return=FALSE)
B = order(target = B, by = colB, decreasing=FALSE, index.return=FALSE)
if (method == "nested-loop") {
# matrix of result data
Y = matrix(0, rows=0, cols=ncol(A) + ncol(B) )
for (i in 1:nrow(A)) {
for (j in 1:nrow(B)) {
if (as.scalar(A[i, colA] == B[j, colB])) {
# Combine the matching row from A and B to match
match = cbind(A[i,], B[j,])
# merge the match row into result Y
Y = rbind(Y, match)
}
}
}
}
# The sort-merge method is from original paper: Qery Processing on Tensor Computation Runtime, section 5-2
else if (method == "sort-merge") {
# get join key columns
left = A[, colA]
right = B[, colB]
# Sort join keys
leftIdx = seq(1, nrow(A))
rightIdx = seq(1, nrow(B))
# Ensure histograms are aligned by creating a common set of keys
commonKeys = max(max(left), max(right));
# Build histograms for the left and right key columns
leftHist = table(left, 1, commonKeys, 1)
rightHist = table(right, 1, commonKeys, 1)
# Compute the number of rows for each pair of matching keys
histMul = leftHist * rightHist
# Compute the prefx sums of histograms
cumLeftHist = cumsum(leftHist)
cumRightHist = cumsum(rightHist)
cumHistMul = cumsum(histMul)
# Initialize the output size and output offsets
outSize = cumHistMul[nrow(cumHistMul), 1]
if(as.scalar(outSize > 0)) {
offset = seq(1, as.scalar(outSize), 1)
# Find the bucket of matching keys to which each output belongs
outBucket = parallelBinarySearch(offset, cumHistMul)
# Determine the number of rows in outBucket
num_rows = nrow(outBucket)
# Compute the element-wise subtraction and store in result
# TODO performance - try avoid iterating over rows
# create a mask to apply the outBucket value as an index of following matrix
seqMatrix = matrix(1, rows=nrow(outBucket), cols=1) %*% t(seq(1, nrow(cumHistMul)))
mask = outer(outBucket, seqMatrix, "==")
updatedoffset = offset - (mask %*% (cumHistMul - histMul)) - 1
leftOutIdx = mask %*% (cumLeftHist - leftHist) + (floor(updatedoffset / mask %*% rightHist)) + 1
rightOutIdx = mask %*% (cumRightHist - rightHist) + (updatedoffset %% (mask %*% rightHist)) + 1
nrows = length(offset)
ncolsA = ncol(A)
ncolsB = ncol(B)
Y = matrix(0, rows=nrows, cols=ncolsA + ncolsB)
# Populate the output matrix Y
for (i in 1:nrows) {
Y[i, 1:ncolsA] = A[as.scalar(leftOutIdx[i, 1]), ]
Y[i, (ncolsA + 1):(ncolsA + ncolsB)] = B[as.scalar(rightOutIdx[i, 1]), ]
}
}
else{
Y = matrix(0, rows=0, cols=1)
}
}
else if( method == "hash" ) {
# Ensure histograms are aligned by creating a common set of keys
commonKeys = max(max(A[,colA]), max(B[,colB]));
# Build histograms for the left and right key columns
leftHist = table(A[,colA], 1, commonKeys, 1)
rightHist = table(B[,colB], 1, commonKeys, 1)
hist = leftHist * rightHist;
# Check for one-to-many
if( max(leftHist)>1 )
stop("Hash join implementation only supports one-to-many joins: "+toString(leftHist));
# Compute selection matrices P1 (one-side) with row duplication
keyPos1 = rowIndexMax(table(A[,colA], seq(1,nrow(A)), commonKeys, nrow(A)))
keyPos1 = removeEmpty(target=keyPos1, margin="rows", select=hist);
hist = removeEmpty(target=hist, margin="rows");
I1 = t(cumsum(rev(t(table(seq(1,nrow(hist)),hist))))) * keyPos1
I1 = removeEmpty(target=matrix(I1, nrow(I1)*ncol(I1),1), margin="rows"); # keys
P1 = table(seq(1,nrow(I1)), I1);
# Select left rows and concatenate right rows
Y = cbind(P1 %*% A, B);
}
# The hash2 method is from the original paper: Qery Processing on Tensor Computation Runtime, section 5-3
else if ( method == "hash2" ) {
# Get join key columns
left = A[,colA]
right = B [,colB]
# Compute indexes and hash values
leftIdx = seq(1, nrow(A))
rightIdx = seq(1, nrow(B))
m = max(max(left),max(right)) + 1; # Assuming a large hash table size
#m = 100
leftHash = left %% m
rightHash = right %% m
# Build histogram of hash values for left join keys
hashBincount = table( leftHash, 1, max(leftHash), 1 )
#Initialize output indexes
leftOutIdx = matrix(0,0,1)
rightOutIdx = matrix(0,0,1)
# Check for one-to-many
if( max(hashBincount) > 1 )
stop("Hash join implementation only supports one-to-many joins: "+toString(hashBincount))
# Build and probe hash table
# Initialize hash table
hashTable=matrix(0,m,1)
# Create a select or matrix and use matrix multiplication to place values
hashTable = t(table(seq(1,nrow(leftIdx)), leftHash, nrow(leftIdx), nrow(hashTable))) %*% leftIdx
# Update lefHash to skip scattered values for future iterations by setting their hashes to m
leftIdxSct = removeEmpty(target=seq(1,nrow(hashTable)), margin="rows", select=(hashTable>=1))
selectedMatrix = table(seq(1, nrow(leftIdxSct)), leftIdxSct, nrow(leftIdxSct), nrow(hashTable))
leftHash = t(selectedMatrix) %*% matrix(m, rows=nrow(leftIdxSct), cols=1, byrow=TRUE)
#Probe hash table and get the left and right indexes
validLeftIdx = matrix(0,0,1)
validRightIdx = matrix(0,0,1)
lefCandIdx = table(seq(1, nrow(rightHash)), rightHash, nrow(rightHash), nrow(hashTable)) %*% hashTable
validKeyMask = (lefCandIdx>0)
# Check if non matching
if( as.scalar(colSums(validKeyMask)) > 0 ){
validLeftIdx = removeEmpty(target=lefCandIdx, margin="rows", select=validKeyMask)
validRightIdx = removeEmpty(target=rightIdx, margin="rows", select=validKeyMask)
# Find matching join keys
selectedValidLeftIdx = table(seq(1,nrow(validLeftIdx)), validLeftIdx, nrow(validLeftIdx), nrow(left)) %*% left
selectedValidRightIdx = table(seq(1,nrow(validRightIdx)), validRightIdx, nrow(validRightIdx), cols=nrow(right)) %*% right
matchMask = ( selectedValidLeftIdx == selectedValidRightIdx )
if ( as.scalar(colSums(matchMask[,1])) > 0) {
leftMatchIdx = removeEmpty(target=validLeftIdx, margin="rows", select=matchMask)
rightMatchIdx = removeEmpty(target=validRightIdx, margin="rows", select=matchMask)
#Append indexes to global results
leftOutIdx = rbind(leftOutIdx, removeEmpty(target=leftMatchIdx, margin="rows"))
rightOutIdx = rbind(rightOutIdx, removeEmpty(target=rightMatchIdx, margin="rows"))
}
}
# Create output
if ( nrow(leftOutIdx) == 0 | nrow(rightOutIdx) == 0 ) {
Y = matrix(0, rows=0, cols=1)
}
else {
Y = matrix(0, rows=nrow(leftOutIdx), cols=ncol(A)+ncol(B))
for( j in 1:nrow(leftOutIdx) ) {
Y[j, ] = cbind( A[as.scalar(leftOutIdx[j]), ], B[as.scalar(rightOutIdx[j]), ] )
}
}
}
}
# Function to perform parallel binary search
parallelBinarySearch = function (Matrix[double] offset, Matrix[double] cumHistMul)
return (Matrix[double] matched_result)
{
n = nrow(cumHistMul)
result = matrix(0, rows=nrow(offset), cols=1)
for (i in 1:nrow(offset)) {
low = 1
high = n
while (low <= high) {
mid = as.integer((low + high) / 2)
if ( as.scalar(offset[i] <= cumHistMul[mid]) ) {
result[i] = mid
high = mid - 1
}
else {
low = mid + 1
}
}
}
matched_result = result
}