blob: 59a260e52bff2e331633a9c4ac86626475a20c3d [file] [log] [blame]
package org.apache.rya.indexing.accumulo.entity;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.rya.accumulo.AccumuloRdfConfiguration;
import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
import org.apache.rya.api.domain.VarNameUtils;
import org.apache.rya.api.persist.joinselect.SelectivityEvalDAO;
import org.apache.rya.indexing.accumulo.ConfigUtils;
import org.apache.rya.joinselect.AccumuloSelectivityEvalDAO;
import org.apache.rya.prospector.service.ProspectorServiceEvalStatsDAO;
import org.apache.rya.rdftriplestore.inference.DoNotExpandSP;
import org.apache.rya.rdftriplestore.utils.FixedStatementPattern;
import org.eclipse.rdf4j.query.BindingSet;
import org.eclipse.rdf4j.query.Dataset;
import org.eclipse.rdf4j.query.algebra.Filter;
import org.eclipse.rdf4j.query.algebra.Join;
import org.eclipse.rdf4j.query.algebra.QueryModelNode;
import org.eclipse.rdf4j.query.algebra.StatementPattern;
import org.eclipse.rdf4j.query.algebra.TupleExpr;
import org.eclipse.rdf4j.query.algebra.evaluation.QueryOptimizer;
import org.eclipse.rdf4j.query.algebra.helpers.AbstractQueryModelVisitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
public class EntityOptimizer implements QueryOptimizer, Configurable {
private static final Logger LOG = LoggerFactory.getLogger(EntityTupleSet.class);
private SelectivityEvalDAO<RdfCloudTripleStoreConfiguration> eval;
private RdfCloudTripleStoreConfiguration conf;
private boolean isEvalDaoSet = false;
public EntityOptimizer() {
}
public EntityOptimizer(RdfCloudTripleStoreConfiguration conf) {
if(conf.isUseStats() && conf.isUseSelectivity()) {
try {
eval = new AccumuloSelectivityEvalDAO(conf, ConfigUtils.getConnector(conf));
((AccumuloSelectivityEvalDAO)eval).setRdfEvalDAO(new ProspectorServiceEvalStatsDAO(ConfigUtils.getConnector(conf), conf));
eval.init();
} catch (final AccumuloException | AccumuloSecurityException e) {
LOG.warn("A problem was encountered while constructing the EntityOptimizer.", e);
}
isEvalDaoSet = true;
} else {
eval = null;
isEvalDaoSet = true;
}
this.conf = conf;
}
public EntityOptimizer(SelectivityEvalDAO<RdfCloudTripleStoreConfiguration> eval) {
this.eval = eval;
this.conf = eval.getConf();
isEvalDaoSet = true;
}
@Override
public void setConf(Configuration conf) {
if(conf instanceof RdfCloudTripleStoreConfiguration) {
this.conf = (RdfCloudTripleStoreConfiguration) conf;
} else {
this.conf = new AccumuloRdfConfiguration(conf);
}
if (!isEvalDaoSet) {
if(this.conf.isUseStats() && this.conf.isUseSelectivity()) {
try {
eval = new AccumuloSelectivityEvalDAO(this.conf, ConfigUtils.getConnector(this.conf));
((AccumuloSelectivityEvalDAO)eval).setRdfEvalDAO(new ProspectorServiceEvalStatsDAO(ConfigUtils.getConnector(this.conf), this.conf));
eval.init();
} catch (final AccumuloException | AccumuloSecurityException e) {
LOG.warn("A problem was encountered while setting the Configuration for the EntityOptimizer.", e);
}
isEvalDaoSet = true;
} else {
eval = null;
isEvalDaoSet = true;
}
}
}
@Override
public Configuration getConf() {
return conf;
}
/**
* Applies generally applicable optimizations: path expressions are sorted
* from more to less specific.
*
* @param tupleExpr
*/
@Override
public void optimize(TupleExpr tupleExpr, Dataset dataset, BindingSet bindings) {
tupleExpr.visit(new JoinVisitor());
}
protected class JoinVisitor extends AbstractQueryModelVisitor<RuntimeException> {
@Override
public void meet(Join node) {
try {
if (node.getLeftArg() instanceof FixedStatementPattern && node.getRightArg() instanceof DoNotExpandSP) {
return;
}
List<TupleExpr> joinArgs = getJoinArgs(node, new ArrayList<TupleExpr>());
HashMultimap<String, StatementPattern> varMap = getVarBins(joinArgs);
while (!varMap.keySet().isEmpty()) {
String s = getHighestPriorityKey(varMap);
constructTuple(varMap, joinArgs, s);
}
List<TupleExpr> filterChain = getFilterChain(joinArgs);
for (TupleExpr te : joinArgs) {
if (!(te instanceof StatementPattern) || !(te instanceof EntityTupleSet)) {
te.visit(this);
}
}
// Replace old join hierarchy
node.replaceWith(getNewJoin(joinArgs, filterChain));
} catch (Exception e) {
e.printStackTrace();
}
}
private List<TupleExpr> getFilterChain(List<TupleExpr> joinArgs) {
List<TupleExpr> filterTopBottom = Lists.newArrayList();
TupleExpr filterChainTop = null;
TupleExpr filterChainBottom = null;
for(int i = 0; i < joinArgs.size(); i++) {
if(joinArgs.get(i) instanceof Filter) {
if(filterChainTop == null) {
filterChainTop = joinArgs.remove(i);
i--;
} else if(filterChainBottom == null){
filterChainBottom = joinArgs.remove(i);
((Filter)filterChainTop).setArg(filterChainBottom);
i--;
} else {
((Filter)filterChainBottom).setArg(joinArgs.remove(i));
filterChainBottom = ((Filter)filterChainBottom).getArg();
i--;
}
}
}
if(filterChainTop != null) {
filterTopBottom.add(filterChainTop);
}
if(filterChainBottom != null) {
filterTopBottom.add(filterChainBottom);
}
return filterTopBottom;
}
private TupleExpr getNewJoin(List<TupleExpr> joinArgs, List<TupleExpr> filterChain) {
TupleExpr newJoin;
if (joinArgs.size() > 1) {
if (filterChain.size() > 0) {
TupleExpr finalJoinArg = joinArgs.remove(0);
TupleExpr tempJoin;
TupleExpr temp = filterChain.get(0);
if (joinArgs.size() > 1) {
tempJoin = new Join(joinArgs.remove(0), joinArgs.remove(0));
for (TupleExpr te : joinArgs) {
tempJoin = new Join(tempJoin, te);
}
} else {
tempJoin = joinArgs.remove(0);
}
if (filterChain.size() == 1) {
((Filter) temp).setArg(tempJoin);
} else {
((Filter) filterChain.get(1)).setArg(tempJoin);
}
newJoin = new Join(temp, finalJoinArg);
} else {
newJoin = new Join(joinArgs.get(0), joinArgs.get(1));
joinArgs.remove(0);
joinArgs.remove(0);
for (TupleExpr te : joinArgs) {
newJoin = new Join(newJoin, te);
}
}
} else if (joinArgs.size() == 1) {
if (filterChain.size() > 0) {
newJoin = filterChain.get(0);
if (filterChain.size() == 1) {
((Filter) newJoin).setArg(joinArgs.get(0));
} else {
((Filter) filterChain.get(1)).setArg(joinArgs.get(0));
}
} else {
newJoin = joinArgs.get(0);
}
} else {
throw new IllegalStateException("JoinArgs size cannot be zero.");
}
return newJoin;
}
private HashMultimap<String, StatementPattern> getVarBins(List<TupleExpr> nodes) {
HashMultimap<String, StatementPattern> varMap = HashMultimap.create();
for (QueryModelNode node : nodes) {
if (node instanceof StatementPattern) {
StatementPattern sp = (StatementPattern) node;
if (sp.getPredicateVar().isConstant()) {
varMap.put(sp.getSubjectVar().getName(), sp);
varMap.put(sp.getObjectVar().getName(), sp);
}
}
}
removeInvalidBins(varMap, true);
return varMap;
}
private void updateVarMap(HashMultimap<String, StatementPattern> varMap, Set<StatementPattern> bin) {
for (StatementPattern sp : bin) {
varMap.remove(sp.getSubjectVar().getName(), sp);
varMap.remove(sp.getObjectVar().getName(), sp);
}
removeInvalidBins(varMap, false);
}
private void removeInvalidBins(HashMultimap<String, StatementPattern> varMap, boolean newMap) {
Set<String> keys = Sets.newHashSet(varMap.keySet());
if (newMap) {
for (String s : keys) {
Set<StatementPattern> spSet = Sets.newHashSet(varMap.get(s));
if (!StarQuery.isValidStarQuery(spSet)) {
for (StatementPattern sp : spSet) {
varMap.remove(s, sp);
}
}
}
} else {
for (String s : keys) {
Set<StatementPattern> spSet = Sets.newHashSet(varMap.get(s));
if (spSet.size() == 1) {
for (StatementPattern sp : spSet) {
varMap.remove(s, sp);
}
}
}
}
}
private void constructTuple(HashMultimap<String, StatementPattern> varMap, List<TupleExpr> joinArgs,
String binName) {
Set<StatementPattern> bin = Sets.newHashSet(varMap.get(binName));
StarQuery sq = new StarQuery(bin);
updateVarMap(varMap, bin);
for (StatementPattern sp : bin) {
joinArgs.remove(sp);
}
joinArgs.add(new EntityTupleSet(sq, conf));
}
private String getHighestPriorityKey(HashMultimap<String, StatementPattern> varMap) {
double tempPriority = -1;
double priority = -Double.MAX_VALUE;
String priorityKey = "";
Set<StatementPattern> bin = null;
Set<String> keys = varMap.keySet();
for (String s : keys) {
bin = varMap.get(s);
tempPriority = bin.size();
tempPriority *= getCardinality(bin);
tempPriority *= getMinCardSp(bin);
// weight starQuery where common Var is constant slightly more -- this factor is subject
// to change
if (VarNameUtils.isConstant(s)) {
tempPriority *= 10;
}
if (tempPriority > priority) {
priority = tempPriority;
priorityKey = s;
}
}
return priorityKey;
}
private double getMinCardSp(Collection<StatementPattern> nodes) {
double cardinality = Double.MAX_VALUE;
double tempCard = -1;
if (eval == null) {
return 1;
}
for (StatementPattern sp : nodes) {
try {
tempCard = eval.getCardinality(conf, sp);
if (tempCard < cardinality) {
cardinality = tempCard;
}
} catch (Exception e) {
e.printStackTrace();
}
}
return cardinality;
}
private double getCardinality(Collection<StatementPattern> spNodes) {
double cardinality = Double.MAX_VALUE;
double tempCard = -1;
if(eval == null) {
return 1;
}
List<StatementPattern> nodes = Lists.newArrayList(spNodes);
AccumuloSelectivityEvalDAO ase = (AccumuloSelectivityEvalDAO) eval;
ase.setDenormalized(true);
try {
for (int i = 0; i < nodes.size(); i++) {
for (int j = i + 1; j < nodes.size(); j++) {
tempCard = ase.getJoinSelect(conf, nodes.get(i), nodes.get(j));
if (tempCard < cardinality) {
cardinality = tempCard;
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
ase.setDenormalized(false);
return cardinality / (nodes.size() + 1);
}
protected <L extends List<TupleExpr>> L getJoinArgs(TupleExpr tupleExpr, L joinArgs) {
if (tupleExpr instanceof Join) {
if (!(((Join) tupleExpr).getLeftArg() instanceof FixedStatementPattern)
&& !(((Join) tupleExpr).getRightArg() instanceof DoNotExpandSP)) {
Join join = (Join) tupleExpr;
getJoinArgs(join.getLeftArg(), joinArgs);
getJoinArgs(join.getRightArg(), joinArgs);
}
} else if(tupleExpr instanceof Filter) {
joinArgs.add(tupleExpr);
getJoinArgs(((Filter)tupleExpr).getArg(), joinArgs);
} else {
joinArgs.add(tupleExpr);
}
return joinArgs;
}
}
}