| #------------------------------------------------------------- |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| #------------------------------------------------------------- |
| |
| # This raJoin-function takes two matrix datasets as input from where it performs |
| # relational operations : join |
| # |
| # INPUT: |
| # ------------------------------------------------------------------------------ |
| # A Matrix of left input data [shape: N x M] |
| # colA Integer indicating the column index of matrix A to execute inner join command |
| # B Matrix of right left data [shape: N x M] |
| # colA Integer indicating the column index of matrix B to execute inner join command |
| # method Join implementation method (nested-loop, sort-merge, hash, hash2) |
| # ------------------------------------------------------------------------------ |
| # |
| # OUTPUT: |
| # ------------------------------------------------------------------------------ |
| # Y Matrix of joined data [shape N' x M] with N' <= N |
| # ------------------------------------------------------------------------------ |
| |
| m_raJoin = function (Matrix[Double] A, Integer colA, Matrix[Double] B, |
| Integer colB, String method) |
| return (Matrix[Double] Y) |
| { |
| # Sort the input Matrix with specific column in order to ensure same output order |
| A = order(target = A, by = colA, decreasing=FALSE, index.return=FALSE) |
| B = order(target = B, by = colB, decreasing=FALSE, index.return=FALSE) |
| |
| if (method == "nested-loop") { |
| # matrix of result data |
| Y = matrix(0, rows=0, cols=ncol(A) + ncol(B) ) |
| |
| for (i in 1:nrow(A)) { |
| for (j in 1:nrow(B)) { |
| if (as.scalar(A[i, colA] == B[j, colB])) { |
| # Combine the matching row from A and B to match |
| match = cbind(A[i,], B[j,]) |
| # merge the match row into result Y |
| Y = rbind(Y, match) |
| } |
| } |
| } |
| } |
| # The sort-merge method is from original paper: Qery Processing on Tensor Computation Runtime, section 5-2 |
| else if (method == "sort-merge") { |
| # get join key columns |
| left = A[, colA] |
| right = B[, colB] |
| |
| # Sort join keys |
| leftIdx = seq(1, nrow(A)) |
| rightIdx = seq(1, nrow(B)) |
| |
| # Ensure histograms are aligned by creating a common set of keys |
| commonKeys = max(max(left), max(right)); |
| |
| # Build histograms for the left and right key columns |
| leftHist = table(left, 1, commonKeys, 1) |
| rightHist = table(right, 1, commonKeys, 1) |
| |
| # Compute the number of rows for each pair of matching keys |
| histMul = leftHist * rightHist |
| |
| # Compute the prefx sums of histograms |
| cumLeftHist = cumsum(leftHist) |
| cumRightHist = cumsum(rightHist) |
| cumHistMul = cumsum(histMul) |
| |
| # Initialize the output size and output offsets |
| outSize = cumHistMul[nrow(cumHistMul), 1] |
| if(as.scalar(outSize > 0)) { |
| offset = seq(1, as.scalar(outSize), 1) |
| |
| # Find the bucket of matching keys to which each output belongs |
| outBucket = parallelBinarySearch(offset, cumHistMul) |
| |
| # Determine the number of rows in outBucket |
| num_rows = nrow(outBucket) |
| |
| # Compute the element-wise subtraction and store in result |
| # TODO performance - try avoid iterating over rows |
| # create a mask to apply the outBucket value as an index of following matrix |
| seqMatrix = matrix(1, rows=nrow(outBucket), cols=1) %*% t(seq(1, nrow(cumHistMul))) |
| mask = outer(outBucket, seqMatrix, "==") |
| |
| updatedoffset = offset - (mask %*% (cumHistMul - histMul)) - 1 |
| leftOutIdx = mask %*% (cumLeftHist - leftHist) + (floor(updatedoffset / mask %*% rightHist)) + 1 |
| rightOutIdx = mask %*% (cumRightHist - rightHist) + (updatedoffset %% (mask %*% rightHist)) + 1 |
| |
| nrows = length(offset) |
| ncolsA = ncol(A) |
| ncolsB = ncol(B) |
| Y = matrix(0, rows=nrows, cols=ncolsA + ncolsB) |
| |
| # Populate the output matrix Y |
| for (i in 1:nrows) { |
| Y[i, 1:ncolsA] = A[as.scalar(leftOutIdx[i, 1]), ] |
| Y[i, (ncolsA + 1):(ncolsA + ncolsB)] = B[as.scalar(rightOutIdx[i, 1]), ] |
| } |
| } |
| else{ |
| Y = matrix(0, rows=0, cols=1) |
| } |
| } |
| else if( method == "hash" ) { |
| # Ensure histograms are aligned by creating a common set of keys |
| commonKeys = max(max(A[,colA]), max(B[,colB])); |
| |
| # Build histograms for the left and right key columns |
| leftHist = table(A[,colA], 1, commonKeys, 1) |
| rightHist = table(B[,colB], 1, commonKeys, 1) |
| hist = leftHist * rightHist; |
| |
| # Check for one-to-many |
| if( max(leftHist)>1 ) |
| stop("Hash join implementation only supports one-to-many joins: "+toString(leftHist)); |
| |
| # Compute selection matrices P1 (one-side) with row duplication |
| keyPos1 = rowIndexMax(table(A[,colA], seq(1,nrow(A)), commonKeys, nrow(A))) |
| keyPos1 = removeEmpty(target=keyPos1, margin="rows", select=hist); |
| hist = removeEmpty(target=hist, margin="rows"); |
| I1 = t(cumsum(rev(t(table(seq(1,nrow(hist)),hist))))) * keyPos1 |
| I1 = removeEmpty(target=matrix(I1, nrow(I1)*ncol(I1),1), margin="rows"); # keys |
| P1 = table(seq(1,nrow(I1)), I1); |
| |
| # Select left rows and concatenate right rows |
| Y = cbind(P1 %*% A, B); |
| } |
| # The hash2 method is from the original paper: Qery Processing on Tensor Computation Runtime, section 5-3 |
| else if ( method == "hash2" ) { |
| # Get join key columns |
| left = A[,colA] |
| right = B [,colB] |
| |
| # Compute indexes and hash values |
| leftIdx = seq(1, nrow(A)) |
| rightIdx = seq(1, nrow(B)) |
| m = max(max(left),max(right)) + 1; # Assuming a large hash table size |
| #m = 100 |
| leftHash = left %% m |
| rightHash = right %% m |
| |
| # Build histogram of hash values for left join keys |
| hashBincount = table( leftHash, 1, max(leftHash), 1 ) |
| |
| #Initialize output indexes |
| leftOutIdx = matrix(0,0,1) |
| rightOutIdx = matrix(0,0,1) |
| |
| # Check for one-to-many |
| if( max(hashBincount) > 1 ) |
| stop("Hash join implementation only supports one-to-many joins: "+toString(hashBincount)) |
| |
| # Build and probe hash table |
| # Initialize hash table |
| hashTable=matrix(0,m,1) |
| |
| # Create a select or matrix and use matrix multiplication to place values |
| hashTable = t(table(seq(1,nrow(leftIdx)), leftHash, nrow(leftIdx), nrow(hashTable))) %*% leftIdx |
| |
| # Update lefHash to skip scattered values for future iterations by setting their hashes to m |
| leftIdxSct = removeEmpty(target=seq(1,nrow(hashTable)), margin="rows", select=(hashTable>=1)) |
| selectedMatrix = table(seq(1, nrow(leftIdxSct)), leftIdxSct, nrow(leftIdxSct), nrow(hashTable)) |
| leftHash = t(selectedMatrix) %*% matrix(m, rows=nrow(leftIdxSct), cols=1, byrow=TRUE) |
| |
| #Probe hash table and get the left and right indexes |
| validLeftIdx = matrix(0,0,1) |
| validRightIdx = matrix(0,0,1) |
| |
| lefCandIdx = table(seq(1, nrow(rightHash)), rightHash, nrow(rightHash), nrow(hashTable)) %*% hashTable |
| validKeyMask = (lefCandIdx>0) |
| |
| # Check if non matching |
| if( as.scalar(colSums(validKeyMask)) > 0 ){ |
| validLeftIdx = removeEmpty(target=lefCandIdx, margin="rows", select=validKeyMask) |
| validRightIdx = removeEmpty(target=rightIdx, margin="rows", select=validKeyMask) |
| |
| # Find matching join keys |
| selectedValidLeftIdx = table(seq(1,nrow(validLeftIdx)), validLeftIdx, nrow(validLeftIdx), nrow(left)) %*% left |
| selectedValidRightIdx = table(seq(1,nrow(validRightIdx)), validRightIdx, nrow(validRightIdx), cols=nrow(right)) %*% right |
| |
| matchMask = ( selectedValidLeftIdx == selectedValidRightIdx ) |
| if ( as.scalar(colSums(matchMask[,1])) > 0) { |
| leftMatchIdx = removeEmpty(target=validLeftIdx, margin="rows", select=matchMask) |
| rightMatchIdx = removeEmpty(target=validRightIdx, margin="rows", select=matchMask) |
| |
| #Append indexes to global results |
| leftOutIdx = rbind(leftOutIdx, removeEmpty(target=leftMatchIdx, margin="rows")) |
| rightOutIdx = rbind(rightOutIdx, removeEmpty(target=rightMatchIdx, margin="rows")) |
| } |
| } |
| |
| # Create output |
| if ( nrow(leftOutIdx) == 0 | nrow(rightOutIdx) == 0 ) { |
| Y = matrix(0, rows=0, cols=1) |
| } |
| else { |
| Y = matrix(0, rows=nrow(leftOutIdx), cols=ncol(A)+ncol(B)) |
| for( j in 1:nrow(leftOutIdx) ) { |
| Y[j, ] = cbind( A[as.scalar(leftOutIdx[j]), ], B[as.scalar(rightOutIdx[j]), ] ) |
| } |
| } |
| } |
| } |
| |
| # Function to perform parallel binary search |
| parallelBinarySearch = function (Matrix[double] offset, Matrix[double] cumHistMul) |
| return (Matrix[double] matched_result) |
| { |
| n = nrow(cumHistMul) |
| result = matrix(0, rows=nrow(offset), cols=1) |
| for (i in 1:nrow(offset)) { |
| low = 1 |
| high = n |
| while (low <= high) { |
| mid = as.integer((low + high) / 2) |
| if ( as.scalar(offset[i] <= cumHistMul[mid]) ) { |
| result[i] = mid |
| high = mid - 1 |
| } |
| else { |
| low = mid + 1 |
| } |
| } |
| } |
| |
| matched_result = result |
| } |
| |