blob: 4c07294ab884d1013cf6f4d9b87e4d3c589bb8d5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.wayang.iejoin.operators.java_helpers;
import org.apache.wayang.iejoin.data.Data;
import org.apache.wayang.iejoin.operators.IEJoinMasterOperator;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.BitSet;
/**
* Created by khayyzy on 5/28/16.
*/
public class BitSetJoin<Type0 extends Comparable<Type0>, Type1 extends Comparable<Type1>, Input> {
/**
*
*/
private static final long serialVersionUID = 2953731260972596253L;
boolean list1ASC;
boolean list2ASC;
boolean list1ASCSec;
boolean list2ASCSec;
boolean equalReverse;
boolean sameRDD;
// boolean trimEqResults;
IEJoinMasterOperator.JoinCondition c1;
public BitSetJoin(boolean list1ASC, boolean list2ASC, boolean list1ASCSec,
boolean list2ASCSec, boolean equalReverse, boolean sameRDD,
IEJoinMasterOperator.JoinCondition c1) {
this.list1ASC = list1ASC;
this.list2ASC = list2ASC;
this.list1ASCSec = list1ASCSec;
this.list2ASCSec = list2ASCSec;
this.equalReverse = equalReverse;
this.sameRDD = sameRDD;
this.c1 = c1;
}
public ArrayList<Tuple2<Data<Type0, Type1>, Input>> merge(ArrayList<Tuple2<Data<Type0, Type1>, Input>> lst1,
ArrayList<Tuple2<Data<Type0, Type1>, Input>> lst2,
boolean asc1, boolean asc2) {
int totalSize = lst1.size() + lst2.size(); // every element in the set
ArrayList<Tuple2<Data<Type0, Type1>, Input>> result = new ArrayList<Tuple2<Data<Type0, Type1>, Input>>(totalSize);
int i = 0;
int j = 0;
DataComparator<Type0, Type1, Input> dc = new DataComparator<>(asc1, asc2);
// short cut if the two lists where already sorted
if (dc.compare(lst1.get(lst1.size() - 1), lst2.get(0)) < 0) {
//System.arraycopy(lst1, 0, result, 0, lst1.size());
int jj = 0;
for (int ii = 0; ii < lst1.size(); ii++) {
result.add(lst1.get(ii));
}
int jjj = lst1.size();
for (int iii = 0; iii < lst2.size(); iii++) {
result.add(lst2.get(iii));
}
//System.arraycopy(lst2, 0, result, lst1.size(), lst2.size());
return result;
}
int k = 0;
while (i + j < totalSize) {
if (i < lst1.size() && j < lst2.size()) {
if (dc.compare(lst1.get(i), lst2.get(j)) < 0) {
result.add(lst1.get(i++));
// i++;
} else {
result.add((lst2.get(j++)));
// j++;
}
} else if (i < lst1.size()) {
// result.add(lst1[i++]);
// i++;
//System.arraycopy(lst1, i, result, k, (lst1.size() - i));
int jj = k;
for (int x = i; x < (lst1.size() - i); x++) {
result.add(lst1.get(x));
jj++;
}
return result;
} else {
// result.add(lst2[j++]);
// j++;
//System.arraycopy(lst2, j, result, k, (lst2.size() - j));
int jjj = k;
for (int iii = j; iii < (lst2.size() - j); iii++) {
result.add(lst2.get(iii));
}
return result;
}
}
return result;
}
public ArrayList<Tuple2<Input, Input>> call(ArrayList<Tuple2<Data<Type0, Type1>, Input>> lst1a,
ArrayList<Tuple2<Data<Type0, Type1>, Input>> lst1b) {
// ArrayList<Tuple2<Long, Long>> output = new ArrayList<Tuple2<Long,
// Long>>(1);
if (sameRDD) {
int[] permutationArray = new int[lst1a.size()];
for (int i = 0; i < permutationArray.length; i++) {
permutationArray[i] = i;
}
ArrayList<Tuple2<Data<Type0, Type1>, Input>> list2 = new ArrayList<Tuple2<Data<Type0, Type1>, Input>>();//new Tuple2<Data<Type0,Type1>,Input>[lst1a.length];
//Collections.copy(lst1a,list2);
list2.addAll(lst1a);
new myMergeSort<Type0, Type1, Input>().sort(list2, permutationArray, new revDataComparator<Type0, Type1, Input>(
list2ASC, list2ASCSec, equalReverse));
ArrayList<Tuple2<Input, Input>> wilResult = getViolationsSelf(lst1a,
permutationArray);
return wilResult;
} else {
// reset pivot flag
for (int i = 0; i < lst1b.size(); i++) {
lst1b.get(i)._1().resetPivot();
}
ArrayList<Tuple2<Data<Type0, Type1>, Input>> list1 = merge(lst1a, lst1b, list1ASC, list1ASCSec);
int[] permutationArray = new int[list1.size()];
for (int i = 0; i < permutationArray.length; i++) {
permutationArray[i] = i;
}
ArrayList<Tuple2<Data<Type0, Type1>, Input>> list2 = new ArrayList<Tuple2<Data<Type0, Type1>, Input>>();//Tuple2<Data<Type0,Type1>,Record>[list1.length];
//System.arraycopy(list1, 0, list2, 0, list1.size());
//Collections.copy(list1,list2);
list2.addAll(list1);
new myMergeSort<Type0, Type1, Input>().sort(list2, permutationArray, new revDataComparator<Type0, Type1, Input>(
list2ASC, list2ASCSec, equalReverse));
ArrayList<Tuple2<Input, Input>> wilResult = getViolationsNonSelf(
list1, permutationArray);
return wilResult;
}
// return output;
}
private ArrayList<Tuple2<Input, Input>> getViolationsSelf(ArrayList<Tuple2<Data<Type0, Type1>, Input>> cond1,
int[] permutationArray) {
ArrayList<Tuple2<Input, Input>> violation = new ArrayList<Tuple2<Input, Input>>(
300000);
long cnt = 0;
int chunckSize = Math.min(permutationArray.length, 1024); // in bit
BitSet bitArray = new BitSet(permutationArray.length);
int indexSize = permutationArray.length / chunckSize;
if (permutationArray.length % chunckSize != 0)
++indexSize;
short[] bitIndex = new short[indexSize];
for (int k = 0; k < bitIndex.length; k++) {
bitIndex[k] = 0;
}
int max = 0;
int offset = (equalReverse == true ? 0 : 1);
for (int k = 0; k < permutationArray.length; k++) {
// scan bit index
int bIndex = permutationArray[k] / chunckSize;
int iter = 0;
// if both conditions are equal do a self join
bitArray.set(permutationArray[k]);
bitIndex[bIndex] = (short) (bitIndex[bIndex] + 1);
max = Math.max(max, bIndex + 1);
for (int z = bIndex; z < max; z++) {
if (bitIndex[z] > 0) {
// scan the chunk
int start = iter == 0 ? permutationArray[k] + offset : z
* chunckSize;
int end = Math.min((z * chunckSize) + chunckSize,
permutationArray.length);
for (int l = start; l < end; l++) {
if (bitArray.get(l)) {
violation.add(new Tuple2<Input, Input>(
cond1.get(permutationArray[k])._2(),//getRowID(),
cond1.get(l)._2()));//getRowID()));
}
}
}
iter++;
}
}
// System.out.println("CNT = "+cnt);
return violation;
}
private ArrayList<Tuple2<Input, Input>> getViolationsNonSelf(ArrayList<Tuple2<Data<Type0, Type1>, Input>> cond1,
int[] permutationArray) {
ArrayList<Tuple2<Input, Input>> violation = new ArrayList<Tuple2<Input, Input>>(
300000);
long cnt = 0;
int chunckSize = Math.min(permutationArray.length, 1024); // in bit
BitSet bitArray = new BitSet(permutationArray.length);
int indexSize = permutationArray.length / chunckSize;
if (permutationArray.length % chunckSize != 0)
++indexSize;
short[] bitIndex = new short[indexSize];
for (int k = 0; k < bitIndex.length; k++) {
bitIndex[k] = 0;
}
int max = 0;
for (int k = 0; k < permutationArray.length; k++) {
// scan bit index if only cond1.get(permutationArray[k]) is primary
// pivot
int bIndex = permutationArray[k] / chunckSize;
bitArray.set(permutationArray[k]);
bitIndex[bIndex] = (short) (bitIndex[bIndex] + 1);
max = Math.max(max, bIndex + 1);
if (cond1.get(permutationArray[k])._1().isPivot()) {
int iter = 0;
for (int z = bIndex; z < max; z++) {
if (bitIndex[z] > 0) {
// scan the chunk
int start = iter == 0 ? permutationArray[k] + 1 : z
* chunckSize;
int end = Math.min((z * chunckSize) + chunckSize,
permutationArray.length);
for (int l = start; l < end; l++) {
if (bitArray.get(l)) {
if (!cond1.get(l)._1().isPivot()) {
// cnt=cnt+1;
violation.add(new Tuple2<Input, Input>(
cond1.get(permutationArray[k])._2()
//.getRowID()
, cond1.get(l)._2()));//.getRowID()));
}
}
}
}
iter++;
}
}
}
// System.out.println("CNT = "+cnt);
return violation;
}
}