blob: 20f5768437af341a82d0c3939464eb21323e126c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.rya.indexing.external.tupleSet;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchScanner;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.rya.accumulo.pcj.iterators.BindingSetHashJoinIterator;
import org.apache.rya.accumulo.pcj.iterators.BindingSetHashJoinIterator.HashJoinType;
import org.apache.rya.accumulo.pcj.iterators.IteratorCombiner;
import org.apache.rya.accumulo.pcj.iterators.PCJKeyToCrossProductBindingSetIterator;
import org.apache.rya.accumulo.pcj.iterators.PCJKeyToJoinBindingSetIterator;
import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
import org.apache.rya.api.domain.VarNameUtils;
import org.apache.rya.api.utils.IteratorWrapper;
import org.apache.rya.indexing.accumulo.ConfigUtils;
import org.apache.rya.indexing.pcj.matching.PCJOptimizerUtilities;
import org.apache.rya.indexing.pcj.storage.PcjException;
import org.apache.rya.indexing.pcj.storage.PcjMetadata;
import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjSerializer;
import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException;
import org.apache.rya.indexing.pcj.storage.accumulo.PcjTables;
import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
import org.apache.rya.rdftriplestore.evaluation.ExternalBatchingIterator;
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.model.Value;
import org.eclipse.rdf4j.query.Binding;
import org.eclipse.rdf4j.query.BindingSet;
import org.eclipse.rdf4j.query.MalformedQueryException;
import org.eclipse.rdf4j.query.QueryEvaluationException;
import org.eclipse.rdf4j.query.algebra.Projection;
import org.eclipse.rdf4j.query.algebra.TupleExpr;
import org.eclipse.rdf4j.query.algebra.evaluation.QueryBindingSet;
import org.eclipse.rdf4j.query.impl.SimpleBinding;
import org.eclipse.rdf4j.query.parser.ParsedTupleQuery;
import org.eclipse.rdf4j.query.parser.sparql.SPARQLParser;
import org.eclipse.rdf4j.sail.SailException;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.HashBiMap;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
/**
* During query planning, this node is inserted into the parsed query to
* represent part of the original query (a sub-query). This sub-query is the
* value returned by {@link ExternalTupleSet#getTupleExpr()}. The results
* associated with this sub-query are stored in an external Accumulo table,
* where accCon and tablename are the associated {@link Connector} and table
* name. During evaluation, the portion of the query in {@link AccumuloIndexSet}
* is evaluated by scanning the external Accumulo table. This class is extremely
* useful for caching queries and reusing results from previous SPARQL queries.
* <p>
*
* The the {@link TupleExpr} returned by {@link ExternalTupleSet#getTupleExpr()}
* may have different variables than the query and variables stored in the
* external Accumulo table. The mapping of variables from the TupleExpr to the
* table variables are given by {@link ExternalTupleSet#getTableVarMap()}. In
* addition to allowing the variables to differ, it is possible for TupleExpr to
* have fewer variables than the table query--that is, some of the variables in
* the table query may appear as constants in the TupleExpr. Theses expression
* are extracted from TupleExpr by the methods
* {@link AccumuloIndexSet#getConstantConstraints()} and by the Visitor
* {@link ValueMapVisitor} to be used as constraints when scanning the Accumulo
* table. This allows for the pre-computed results to be used for a larger class
* of sub-queries.
*
*/
public class AccumuloIndexSet extends ExternalTupleSet implements
ExternalBatchingIterator {
private final Connector accCon; // connector to Accumulo table where results
// are stored
private final String tablename; // name of Accumulo table
private List<String> varOrder = null; // orders in which results are written
// to table
private final PcjTables pcj = new PcjTables();
private final Authorizations auths;
@Override
public Map<String, Set<String>> getSupportedVariableOrders() {
return this.getSupportedVariableOrderMap();
}
@Override
public String getSignature() {
return "AccumuloIndexSet(" + tablename + ") : "
+ Joiner.on(", ").join(this.getTupleExpr().getBindingNames());
}
/**
*
* @param sparql
* - name of sparql query whose results will be stored in PCJ
* table
* @param accCon
* - connection to a valid Accumulo instance
* @param tablename
* - name of an existing PCJ table
* @throws MalformedQueryException
* @throws SailException
* @throws QueryEvaluationException
* @throws TableNotFoundException
* @throws AccumuloSecurityException
* @throws AccumuloException
* @throws PCJStorageException
*/
public AccumuloIndexSet(final String sparql, final Configuration conf,
final String tablename) throws MalformedQueryException, SailException,
QueryEvaluationException, TableNotFoundException, AccumuloException, AccumuloSecurityException, PCJStorageException {
this.tablename = tablename;
this.accCon = ConfigUtils.getConnector(conf);
this.auths = getAuthorizations(conf);
final SPARQLParser sp = new SPARQLParser();
final ParsedTupleQuery pq = (ParsedTupleQuery) sp.parseQuery(sparql, null);
final TupleExpr te = pq.getTupleExpr();
Preconditions.checkArgument(PCJOptimizerUtilities.isPCJValid(te),
"TupleExpr is an invalid PCJ.");
final Optional<Projection> projection = new ParsedQueryUtil()
.findProjection(pq);
if (!projection.isPresent()) {
throw new MalformedQueryException("SPARQL query '" + sparql
+ "' does not contain a Projection.");
}
setProjectionExpr(projection.get());
Set<VariableOrder> orders = null;
orders = pcj.getPcjMetadata(accCon, tablename).getVarOrders();
varOrder = Lists.newArrayList();
for (final VariableOrder var : orders) {
varOrder.add(var.toString());
}
setLocalityGroups(tablename, accCon, varOrder);
this.setSupportedVariableOrderMap(varOrder);
}
/**
*
* @param accCon
* - connection to a valid Accumulo instance
* @param tablename
* - name of an existing PCJ table
* @throws MalformedQueryException
* @throws SailException
* @throws QueryEvaluationException
* @throws TableNotFoundException
* @throws AccumuloSecurityException
* @throws AccumuloException
* @throws PCJStorageException
*/
public AccumuloIndexSet(final Configuration conf, final String tablename)
throws MalformedQueryException, SailException,
QueryEvaluationException, TableNotFoundException, AccumuloException, AccumuloSecurityException, PCJStorageException {
this.tablename = tablename;
this.accCon = ConfigUtils.getConnector(conf);
this.auths = getAuthorizations(conf);
PcjMetadata meta = pcj.getPcjMetadata(accCon, tablename);
final SPARQLParser sp = new SPARQLParser();
final ParsedTupleQuery pq = (ParsedTupleQuery) sp.parseQuery(meta.getSparql(), null);
setProjectionExpr((Projection) pq.getTupleExpr());
final Set<VariableOrder> orders = meta.getVarOrders();
varOrder = Lists.newArrayList();
for (final VariableOrder var : orders) {
varOrder.add(var.toString());
}
setLocalityGroups(tablename, accCon, varOrder);
this.setSupportedVariableOrderMap(varOrder);
}
private Authorizations getAuthorizations(final Configuration conf) {
final String authString = conf.get(RdfCloudTripleStoreConfiguration.CONF_QUERY_AUTH, "");
if (authString.isEmpty()) {
return new Authorizations();
}
return new Authorizations(authString.split(","));
}
/**
* returns size of table for query planning
*/
@Override
public double cardinality() {
double cardinality = 0;
try {
cardinality = pcj.getPcjMetadata(accCon, tablename)
.getCardinality();
} catch (final PcjException e) {
e.printStackTrace();
}
return cardinality;
}
/**
*
* @param tableName
* @param conn
* @param groups
* - locality groups to be created
*
* Sets locality groups for more efficient scans - these are
* usually the variable orders in the table so that scans for
* specific orders are more efficient
*/
private void setLocalityGroups(final String tableName, final Connector conn,
final List<String> groups) {
final HashMap<String, Set<Text>> localityGroups = new HashMap<String, Set<Text>>();
for (int i = 0; i < groups.size(); i++) {
final HashSet<Text> tempColumn = new HashSet<Text>();
tempColumn.add(new Text(groups.get(i)));
final String groupName = groups.get(i).replace(VALUE_DELIM, "");
localityGroups.put(groupName, tempColumn);
}
try {
conn.tableOperations().setLocalityGroups(tableName, localityGroups);
} catch (AccumuloException | AccumuloSecurityException
| TableNotFoundException e) {
e.printStackTrace();
}
}
@Override
public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(
final BindingSet bindingset) throws QueryEvaluationException {
return this.evaluate(Collections.singleton(bindingset));
}
/**
* Core evaluation method used during query evaluation - given a collection
* of binding set constraints, this method finds common binding labels
* between the constraints and table, uses those to build a prefix scan of
* the Accumulo table, and creates a solution binding set by iterating of
* the scan results.
* @param bindingset - collection of {@link BindingSet}s to be joined with PCJ
* @return - CloseableIteration over joined results
*/
@Override
public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(
final Collection<BindingSet> bindingset)
throws QueryEvaluationException {
if (bindingset.isEmpty()) {
return new IteratorWrapper<BindingSet, QueryEvaluationException>(
new HashSet<BindingSet>().iterator());
}
final List<BindingSet> crossProductBs = new ArrayList<>();
final Map<String, Value> constantConstraints = new HashMap<>();
final Set<Range> hashJoinRanges = new HashSet<>();
final Range EMPTY_RANGE = new Range("", true, "~", false);
Range crossProductRange = EMPTY_RANGE;
String localityGroupOrder = varOrder.get(0);
int maxPrefixLen = Integer.MIN_VALUE;
int prefixLen = 0;
int oldPrefixLen = 0;
final Multimap<String, BindingSet> bindingSetHashMap = HashMultimap.create();
HashJoinType joinType = HashJoinType.CONSTANT_JOIN_VAR;
final Set<String> unAssuredVariables = Sets.difference(getTupleExpr().getBindingNames(), getTupleExpr().getAssuredBindingNames());
boolean useColumnScan = false;
boolean isCrossProd = false;
boolean containsConstantConstraints = false;
final BindingSet constants = getConstantConstraints();
containsConstantConstraints = constants.size() > 0;
try {
for (final BindingSet bs : bindingset) {
if (bindingset.size() == 1 && bs.size() == 0) {
// in this case, only single, empty bindingset, pcj node is
// first node in query plan - use full Range scan with
// column
// family set
useColumnScan = true;
}
// get common vars for PCJ - only use variables associated
// with assured Bindings
final QueryBindingSet commonVars = new QueryBindingSet();
for (final String b : getTupleExpr().getAssuredBindingNames()) {
final Binding v = bs.getBinding(b);
if (v != null) {
commonVars.addBinding(v);
}
}
// no common vars implies cross product
if (commonVars.size() == 0 && bs.size() != 0) {
crossProductBs.add(bs);
isCrossProd = true;
}
//get a varOrder from orders in PCJ table - use at least
//one common variable
final BindingSetVariableOrder varOrder = getVarOrder(
commonVars.getBindingNames(),
constants.getBindingNames());
// update constant constraints not used in varOrder and
// update Bindings used to form range by removing unused
// variables
commonVars.addAll(constants);
if (commonVars.size() > varOrder.varOrderLen) {
final Map<String, Value> valMap = getConstantValueMap();
for (final String s : new HashSet<String>(varOrder.unusedVars)) {
if (valMap.containsKey(s)
&& !constantConstraints.containsKey(s)) {
constantConstraints.put(s, valMap.get(s));
}
commonVars.removeBinding(s);
}
}
if (containsConstantConstraints
&& (useColumnScan || isCrossProd)) {
// only one range required in event of a cross product or
// empty BindingSet
// Range will either be full table Range or determined by
// constant constraints
if (crossProductRange == EMPTY_RANGE) {
crossProductRange = getRange(varOrder.varOrder,
commonVars);
localityGroupOrder = prefixToOrder(varOrder.varOrder);
}
} else if (!useColumnScan && !isCrossProd) {
// update ranges and add BindingSet to HashJoinMap if not a
// cross product
hashJoinRanges.add(getRange(varOrder.varOrder, commonVars));
prefixLen = varOrder.varOrderLen;
// check if common Variable Orders are changing between
// BindingSets (happens in case
// of Optional). If common variable set length changes from
// BindingSet to BindingSet
// update the HashJoinType to be VARIABLE_JOIN_VAR.
if (oldPrefixLen == 0) {
oldPrefixLen = prefixLen;
} else {
if (oldPrefixLen != prefixLen
&& joinType == HashJoinType.CONSTANT_JOIN_VAR) {
joinType = HashJoinType.VARIABLE_JOIN_VAR;
}
oldPrefixLen = prefixLen;
}
// update max prefix len
if (prefixLen > maxPrefixLen) {
maxPrefixLen = prefixLen;
}
final String key = getHashJoinKey(varOrder.varOrder, commonVars);
bindingSetHashMap.put(key, bs);
}
isCrossProd = false;
}
// create full Range scan iterator and set column family if empty
// collection or if cross product BindingSet exists and no hash join
// BindingSets
if ((useColumnScan || crossProductBs.size() > 0)
&& bindingSetHashMap.size() == 0) {
final Scanner scanner = accCon.createScanner(tablename, auths);
// cross product with no cross product constraints here
scanner.setRange(crossProductRange);
scanner.fetchColumnFamily(new Text(localityGroupOrder));
return new PCJKeyToCrossProductBindingSetIterator(scanner,
crossProductBs, constantConstraints, unAssuredVariables, getTableVarMap());
} else if ((useColumnScan || crossProductBs.size() > 0)
&& bindingSetHashMap.size() > 0) {
// in this case, both hash join BindingSets and cross product
// BindingSets exist
// create an iterator to evaluate cross product and an iterator
// for hash join, then combine
final List<CloseableIteration<BindingSet, QueryEvaluationException>> iteratorList = new ArrayList<>();
// create cross product iterator
final Scanner scanner1 = accCon.createScanner(tablename, auths);
scanner1.setRange(crossProductRange);
scanner1.fetchColumnFamily(new Text(localityGroupOrder));
iteratorList.add(new PCJKeyToCrossProductBindingSetIterator(
scanner1, crossProductBs, constantConstraints, unAssuredVariables,
getTableVarMap()));
// create hash join iterator
final BatchScanner scanner2 = accCon.createBatchScanner(tablename, auths, 10);
scanner2.setRanges(hashJoinRanges);
final PCJKeyToJoinBindingSetIterator iterator = new PCJKeyToJoinBindingSetIterator(
scanner2, getTableVarMap(), maxPrefixLen);
iteratorList.add(new BindingSetHashJoinIterator(
bindingSetHashMap, iterator, unAssuredVariables, joinType));
// combine iterators
return new IteratorCombiner(iteratorList);
} else {
// only hash join BindingSets exist
final BatchScanner scanner = accCon.createBatchScanner(tablename, auths, 10);
// only need to create hash join iterator
scanner.setRanges(hashJoinRanges);
final PCJKeyToJoinBindingSetIterator iterator = new PCJKeyToJoinBindingSetIterator(
scanner, getTableVarMap(), maxPrefixLen);
return new BindingSetHashJoinIterator(bindingSetHashMap,
iterator, unAssuredVariables, joinType);
}
} catch (final Exception e) {
throw new QueryEvaluationException(e);
}
}
private String getHashJoinKey(final String commonVarOrder, final BindingSet bs) {
final String[] commonVarArray = commonVarOrder.split(VAR_ORDER_DELIM);
String key = bs.getValue(commonVarArray[0]).toString();
for (int i = 1; i < commonVarArray.length; i++) {
key = key + VALUE_DELIM + bs.getValue(commonVarArray[i]).toString();
}
return key;
}
private Range getRange(final String commonVarOrder, final BindingSet bs)
throws BindingSetConversionException {
final AccumuloPcjSerializer converter = new AccumuloPcjSerializer();
byte[] rangePrefix = new byte[0];
rangePrefix = converter.convert(bs, new VariableOrder(commonVarOrder));
return Range.prefix(new Text(rangePrefix));
}
/**
*
* @param variableBindingNames
* - names corresponding to variables
* @param constantBindingNames
* - names corresponding to constant constraints
* @return - {@link BindingSetVariableOrder} object containing largest
* possible supported variable order built from variableBindingNames
* and constantBindingNames
*/
private BindingSetVariableOrder getVarOrder(
final Set<String> variableBindingNames, final Set<String> constantBindingNames) {
final Map<String, Set<String>> varOrderMap = this
.getSupportedVariableOrders();
final Set<Map.Entry<String, Set<String>>> entries = varOrderMap.entrySet();
Set<String> variables;
if (variableBindingNames.size() == 0
&& constantBindingNames.size() == 0) {
return new BindingSetVariableOrder("", 0, new HashSet<String>());
} else if (variableBindingNames.size() > 0
&& constantBindingNames.size() == 0) {
variables = variableBindingNames;
} else if (variableBindingNames.size() == 0
&& constantBindingNames.size() > 0) {
variables = constantBindingNames;
} else {
variables = Sets.union(variableBindingNames, constantBindingNames);
String maxPrefix = null;
int maxPrefixLen = 0;
Set<String> minUnusedVariables = null;
for (final Map.Entry<String, Set<String>> e : entries) {
final Set<String> value = e.getValue();
if (maxPrefixLen < value.size()
&& variables.containsAll(value)
&& Sets.intersection(value, variableBindingNames)
.size() > 0) {
maxPrefixLen = value.size();
maxPrefix = e.getKey();
minUnusedVariables = Sets.difference(variables, value);
if (maxPrefixLen == variables.size()) {
break;
}
}
}
return new BindingSetVariableOrder(maxPrefix, maxPrefixLen,
minUnusedVariables);
}
String maxPrefix = null;
int maxPrefixLen = 0;
Set<String> minUnusedVariables = null;
for (final Map.Entry<String, Set<String>> e : entries) {
final Set<String> value = e.getValue();
if (maxPrefixLen < value.size() && variables.containsAll(value)) {
maxPrefixLen = value.size();
maxPrefix = e.getKey();
minUnusedVariables = Sets.difference(variables, value);
if (maxPrefixLen == variables.size()) {
break;
}
}
}
return new BindingSetVariableOrder(maxPrefix, maxPrefixLen,
minUnusedVariables);
}
/**
* @return - all constraints which correspond to variables in
* {@link AccumuloIndexSet#getTupleExpr()} which are set equal to a
* constant, but are non-constant in Accumulo table
*/
private BindingSet getConstantConstraints() {
final Map<String, String> tableMap = this.getTableVarMap();
final Set<String> keys = tableMap.keySet();
final QueryBindingSet constants = new QueryBindingSet();
for (final String s : keys) {
if (VarNameUtils.isConstant(s)) {
constants.addBinding(new SimpleBinding(s, getConstantValueMap()
.get(s)));
}
}
return constants;
}
/**
*
* @param order - prefix of a full variable order
* @return - full variable order that includes all variables whose values
* are stored in the table - used to obtain the locality group
*/
//given partial order of query vars, convert to PCJ vars and determine
//if converted partial order is a substring of a full var order of PCJ variables.
//if converted partial order is a prefix, convert corresponding full PCJ var order to query vars
private String prefixToOrder(String order) {
final Map<String, String> invMap = HashBiMap.create(this.getTableVarMap()).inverse();
final String[] temp = order.split(VAR_ORDER_DELIM);
//get order in terms of PCJ variables
for (int i = 0; i < temp.length; i++) {
temp[i] = this.getTableVarMap().get(temp[i]);
}
order = Joiner.on(VAR_ORDER_DELIM).join(temp);
for (final String s : varOrder) {
//verify that partial order is prefix of a PCJ varOrder
if (s.startsWith(order)) {
return s;
}
}
throw new NoSuchElementException("Order is not a prefix of any locality group value!");
}
private class BindingSetVariableOrder {
Set<String> unusedVars;
int varOrderLen = 0;
String varOrder;
public BindingSetVariableOrder(final String varOrder, final int len,
final Set<String> unused) {
this.varOrder = varOrder;
this.varOrderLen = len;
this.unusedVars = unused;
}
}
}