blob: 200f8451e58d51f86b9392e1058c70e338398f95 [file] [log] [blame]
package org.apache.rya.accumulo.pcj.iterators;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.Set;
import org.apache.rya.api.domain.VarNameUtils;
import org.apache.rya.indexing.external.tupleSet.ExternalTupleSet;
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.query.BindingSet;
import org.eclipse.rdf4j.query.QueryEvaluationException;
import org.eclipse.rdf4j.query.algebra.evaluation.QueryBindingSet;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
/**
* This {@link CloseableIteration} performs a hash join by joining each
* {@link Map.Entry<String, BindingSet>} with all corresponding
* {@link BindingSet} in a Multimap with the same String key.
*
*/
public class BindingSetHashJoinIterator implements
CloseableIteration<BindingSet, QueryEvaluationException> {
//BindingSets passed to PCJ mapped according to values
//associated with common variables with table
private Multimap<String, BindingSet> bindingJoinVarHash;
//BindingSets taken from PCJ table
private CloseableIteration<Map.Entry<String, BindingSet>, QueryEvaluationException> joinIter;
private Iterator<BindingSet> joinedBindingSets = Collections
.emptyIterator();
//If PCJ contains LeftJoin, this is a set of variable in LeftJoin. Used when performing Join.
private Set<String> unAssuredVariables;
//indicates when HashJoin formed from a single collection of join variable or if the size and
//collection of join variables varies -- this is to optimize the join process
private HashJoinType type;
private final BindingSet EMPTY_BINDINGSET = new QueryBindingSet();
private BindingSet next;
private boolean hasNextCalled = false;
private boolean isEmpty = false;
/**
* Enum type to indicate whether HashJoin will be performed over a fixed
* subset of variables common to each {@link BindingSet}, or if there is a
* collection of variable subsets over which to join.
*
*/
public enum HashJoinType {
CONSTANT_JOIN_VAR, VARIABLE_JOIN_VAR
}
public BindingSetHashJoinIterator(
Multimap<String, BindingSet> bindingJoinVarHash,
CloseableIteration<Map.Entry<String, BindingSet>, QueryEvaluationException> joinIter,
Set<String> unAssuredVariables, HashJoinType type) {
this.bindingJoinVarHash = bindingJoinVarHash;
this.joinIter = joinIter;
this.type = type;
this.unAssuredVariables = unAssuredVariables;
}
@Override
public boolean hasNext() throws QueryEvaluationException {
if (!hasNextCalled && !isEmpty) {
while (joinedBindingSets.hasNext() || joinIter.hasNext()) {
if (!joinedBindingSets.hasNext()) {
Entry<String, BindingSet> entry = joinIter.next();
joinedBindingSets = joinBindingSetEntry(entry);
}
if (!joinedBindingSets.hasNext()) {
continue;
}
next = joinedBindingSets.next();
hasNextCalled = true;
return true;
}
isEmpty = true;
return false;
} else {
return !isEmpty;
}
}
@Override
public BindingSet next() throws QueryEvaluationException {
if (hasNextCalled) {
hasNextCalled = false;
} else if (isEmpty) {
throw new NoSuchElementException();
} else {
if (this.hasNext()) {
hasNextCalled = false;
} else {
throw new NoSuchElementException();
}
}
return next;
}
@Override
public void remove() throws QueryEvaluationException {
throw new UnsupportedOperationException();
}
@Override
public void close() throws QueryEvaluationException {
joinIter.close();
}
/**
* This method takes the valOrderString, which is a key used for computing
* hash joins, and generates multiple keys by pulling off one delimiter
* separated component at a time. This is used when the size of the join key
* varies from {@link Map.Entry} to Entry. It allows the BindingSet to be
* joined using all prefixes of the key.
*
* @param valOrderString
* - key used for hash join
* @return
*/
private List<String> getValueOrders(String valOrderString) {
List<String> valueOrders = new ArrayList<>();
String[] splitValOrderString = valOrderString
.split(ExternalTupleSet.VALUE_DELIM);
StringBuffer buffer = new StringBuffer();
buffer.append(splitValOrderString[0]);
valueOrders.add(buffer.substring(0));
for (int i = 1; i < splitValOrderString.length; i++) {
buffer.append(ExternalTupleSet.VALUE_DELIM + splitValOrderString[i]);
valueOrders.add(buffer.substring(0));
}
return valueOrders;
}
/**
* This method verifies that all common variables have a common value and
* then joins the BindingSets together. In the case that the PCJ contains a
* LeftJoin, if the leftBs and rightBs have a common variable with distinct
* values and that common variable is unassured (only appears in LeftJoin),
* this method uses the value corresponding to leftBs.
*
* @param leftBs
* - BindingSet passed into PCJ
* @param rightBs
* - PCJ BindingSet
* @return - joined BindingSet
*/
private BindingSet joinBindingSets(BindingSet leftBs, BindingSet rightBs) {
Set<String> commonVars = Sets.intersection(leftBs.getBindingNames(),
rightBs.getBindingNames());
// compare values associated with common variables to make sure
// BindingSets can be joined. Possible for leftBs and rightBs
// to have a common unAssuredVariable in event PCJ contains LeftJoin.
// if values corresponding to common unAssuredVariable do not agree
// add value corresponding to leftBs
for (String s : commonVars) {
if (!leftBs.getValue(s).equals(rightBs.getValue(s))
&& !unAssuredVariables.contains(s)) {
return EMPTY_BINDINGSET;
}
}
QueryBindingSet bs = new QueryBindingSet(removeConstants(leftBs));
rightBs = removeConstants(rightBs);
// only add Bindings corresponding to variables that have no value
// assigned. This takes into account case where leftBs and rightBs
// share a common, unAssuredVariable. In this case, use value
// corresponding
// to leftBs, which is effectively performing a LeftJoin.
for (String s : rightBs.getBindingNames()) {
if (bs.getValue(s) == null) {
bs.addBinding(s, rightBs.getValue(s));
}
}
return bs;
}
private BindingSet removeConstants(BindingSet bs) {
QueryBindingSet bSet = new QueryBindingSet();
for (String s : bs.getBindingNames()) {
if (!VarNameUtils.isConstant(s)) {
bSet.addBinding(bs.getBinding(s));
}
}
return bSet;
}
/**
* This method returns an Iterator which joins the given Entry's BindingSet
* to all BindingSets which matching the Entry's key.
*
* @param entry - entry to be joined
* @return - Iterator over joined BindingSets
*/
private Iterator<BindingSet> joinBindingSetEntry(
Map.Entry<String, BindingSet> entry) {
List<Collection<BindingSet>> matches = new ArrayList<>();
if (type == HashJoinType.CONSTANT_JOIN_VAR) {
if (bindingJoinVarHash.containsKey(entry.getKey())) {
matches.add(bindingJoinVarHash.get(entry.getKey()));
}
} else {
List<String> valOrders = getValueOrders(entry.getKey());
for (String s : valOrders) {
if (bindingJoinVarHash.containsKey(s)) {
matches.add(bindingJoinVarHash.get(s));
}
}
}
if (matches.size() == 0) {
return Collections.emptyIterator();
} else {
return new BindingSetCollectionsJoinIterator(entry.getValue(),
matches);
}
}
/**
* Given a BindingSet and a List of Collections of BindingSets, this
* Iterator joins the BindingSet with the BindingSets in each Collection
*
*/
private class BindingSetCollectionsJoinIterator implements
Iterator<BindingSet> {
private Iterator<Collection<BindingSet>> collectionIter;
private Iterator<BindingSet> bsIter = Collections.emptyIterator();
private BindingSet next;
private BindingSet joinBs;
private boolean hasNextCalled = false;
private boolean isEmpty = false;
public BindingSetCollectionsJoinIterator(BindingSet bs,
List<Collection<BindingSet>> collection) {
this.collectionIter = collection.iterator();
this.joinBs = bs;
}
@Override
public boolean hasNext() {
if (!hasNextCalled && !isEmpty) {
while (bsIter.hasNext() || collectionIter.hasNext()) {
if (!bsIter.hasNext()) {
bsIter = collectionIter.next().iterator();
}
next = joinBindingSets(bsIter.next(), joinBs);
if (next == EMPTY_BINDINGSET) {
continue;
}
hasNextCalled = true;
return true;
}
isEmpty = true;
return false;
} else {
return !isEmpty;
}
}
@Override
public BindingSet next() {
if (hasNextCalled) {
hasNextCalled = false;
} else if (isEmpty) {
throw new NoSuchElementException();
} else {
if (this.hasNext()) {
hasNextCalled = false;
} else {
throw new NoSuchElementException();
}
}
return next;
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
}
}