blob: 1b205145a098a318a1f70f7cc1706844e80f7906 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.rya.indexing.accumulo.entity;
import static org.apache.rya.api.RdfCloudTripleStoreConstants.DELIM_BYTE;
import static org.apache.rya.api.RdfCloudTripleStoreConstants.TYPE_DELIM_BYTE;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.Set;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchScanner;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.hadoop.io.Text;
import org.apache.rya.accumulo.AccumuloRdfConfiguration;
import org.apache.rya.accumulo.documentIndex.DocIndexIteratorUtil;
import org.apache.rya.accumulo.documentIndex.DocumentIndexIntersectingIterator;
import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
import org.apache.rya.api.domain.RyaURI;
import org.apache.rya.api.resolver.RyaContext;
import org.apache.rya.api.resolver.RyaToRdfConversions;
import org.apache.rya.api.resolver.RyaTypeResolverException;
import org.apache.rya.indexing.DocIdIndexer;
import org.apache.rya.indexing.accumulo.ConfigUtils;
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.query.BindingSet;
import org.eclipse.rdf4j.query.MalformedQueryException;
import org.eclipse.rdf4j.query.QueryEvaluationException;
import org.eclipse.rdf4j.query.algebra.StatementPattern;
import org.eclipse.rdf4j.query.algebra.TupleExpr;
import org.eclipse.rdf4j.query.algebra.evaluation.QueryBindingSet;
import org.eclipse.rdf4j.query.algebra.helpers.StatementPatternCollector;
import org.eclipse.rdf4j.query.parser.ParsedQuery;
import org.eclipse.rdf4j.query.parser.sparql.SPARQLParser;
import com.google.common.base.Preconditions;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Sets;
import com.google.common.primitives.Bytes;
public class AccumuloDocIdIndexer implements DocIdIndexer {
private BatchScanner bs;
private final AccumuloRdfConfiguration conf;
public AccumuloDocIdIndexer(final RdfCloudTripleStoreConfiguration conf) throws AccumuloException, AccumuloSecurityException {
Preconditions.checkArgument(conf instanceof RdfCloudTripleStoreConfiguration, "conf must be isntance of RdfCloudTripleStoreConfiguration");
this.conf = (AccumuloRdfConfiguration) conf;
//Connector conn = ConfigUtils.getConnector(conf);
}
public CloseableIteration<BindingSet, QueryEvaluationException> queryDocIndex(final String sparqlQuery,
final Collection<BindingSet> constraints) throws TableNotFoundException, QueryEvaluationException {
final SPARQLParser parser = new SPARQLParser();
ParsedQuery pq1 = null;
try {
pq1 = parser.parseQuery(sparqlQuery, null);
} catch (final MalformedQueryException e) {
e.printStackTrace();
throw new QueryEvaluationException("Malformed query. query=" + sparqlQuery, e);
}
final TupleExpr te1 = pq1.getTupleExpr();
final List<StatementPattern> spList1 = StatementPatternCollector.process(te1);
if(StarQuery.isValidStarQuery(spList1)) {
final StarQuery sq1 = new StarQuery(spList1);
return queryDocIndex(sq1, constraints);
} else {
throw new IllegalArgumentException("Invalid star query!");
}
}
@Override
public CloseableIteration<BindingSet, QueryEvaluationException> queryDocIndex(final StarQuery query,
final Collection<BindingSet> constraints) throws TableNotFoundException, QueryEvaluationException {
final StarQuery starQ = query;
final Iterator<BindingSet> bs = constraints.iterator();
final Iterator<BindingSet> bs2 = constraints.iterator();
final Set<String> unCommonVarNames;
final Set<String> commonVarNames;
if (bs2.hasNext()) {
final BindingSet currBs = bs2.next();
commonVarNames = StarQuery.getCommonVars(query, currBs);
unCommonVarNames = Sets.difference(currBs.getBindingNames(), commonVarNames);
} else {
commonVarNames = Sets.newHashSet();
unCommonVarNames = Sets.newHashSet();
}
if( commonVarNames.size() == 1 && !query.commonVarConstant() && commonVarNames.contains(query.getCommonVarName())) {
final HashMultimap<String, BindingSet> map = HashMultimap.create();
final String commonVar = starQ.getCommonVarName();
final Iterator<Entry<Key, Value>> intersections;
final BatchScanner scan;
final Set<Range> ranges = Sets.newHashSet();
while(bs.hasNext()) {
final BindingSet currentBs = bs.next();
if(currentBs.getBinding(commonVar) == null) {
continue;
}
final String row = currentBs.getBinding(commonVar).getValue().stringValue();
ranges.add(new Range(row));
map.put(row, currentBs);
}
scan = runQuery(starQ, ranges);
intersections = scan.iterator();
return new CloseableIteration<BindingSet, QueryEvaluationException>() {
private QueryBindingSet currentSolutionBs = null;
private boolean hasNextCalled = false;
private boolean isEmpty = false;
private Iterator<BindingSet> inputSet = new ArrayList<BindingSet>().iterator();
private BindingSet currentBs;
private Key key;
@Override
public boolean hasNext() throws QueryEvaluationException {
if (!hasNextCalled && !isEmpty) {
while (inputSet.hasNext() || intersections.hasNext()) {
if (!inputSet.hasNext()) {
key = intersections.next().getKey();
inputSet = map.get(key.getRow().toString()).iterator();
}
currentBs = inputSet.next();
currentSolutionBs = deserializeKey(key, starQ, currentBs, unCommonVarNames);
if (currentSolutionBs.size() == unCommonVarNames.size() + starQ.getUnCommonVars().size() +1) {
hasNextCalled = true;
return true;
}
}
isEmpty = true;
return false;
} else {
return !isEmpty;
}
}
@Override
public BindingSet next() throws QueryEvaluationException {
if (hasNextCalled) {
hasNextCalled = false;
} else if (isEmpty) {
throw new NoSuchElementException();
} else {
if (this.hasNext()) {
hasNextCalled = false;
} else {
throw new NoSuchElementException();
}
}
return currentSolutionBs;
}
@Override
public void remove() throws QueryEvaluationException {
throw new UnsupportedOperationException();
}
@Override
public void close() throws QueryEvaluationException {
scan.close();
}
};
} else {
return new CloseableIteration<BindingSet, QueryEvaluationException>() {
@Override
public void remove() throws QueryEvaluationException {
throw new UnsupportedOperationException();
}
private Iterator<Entry<Key, Value>> intersections = null;
private QueryBindingSet currentSolutionBs = null;
private boolean hasNextCalled = false;
private boolean isEmpty = false;
private boolean init = false;
private BindingSet currentBs;
private StarQuery sq = new StarQuery(starQ);
private final Set<Range> emptyRangeSet = Sets.newHashSet();
private BatchScanner scan;
@Override
public BindingSet next() throws QueryEvaluationException {
if (hasNextCalled) {
hasNextCalled = false;
} else if (isEmpty) {
throw new NoSuchElementException();
} else {
if (this.hasNext()) {
hasNextCalled = false;
} else {
throw new NoSuchElementException();
}
}
return currentSolutionBs;
}
@Override
public boolean hasNext() throws QueryEvaluationException {
if (!init) {
if (intersections == null && bs.hasNext()) {
currentBs = bs.next();
sq = StarQuery.getConstrainedStarQuery(sq, currentBs);
scan = runQuery(sq,emptyRangeSet);
intersections = scan.iterator();
// binding set empty
} else if (intersections == null && !bs.hasNext()) {
currentBs = new QueryBindingSet();
scan = runQuery(starQ,emptyRangeSet);
intersections = scan.iterator();
}
init = true;
}
if (!hasNextCalled && !isEmpty) {
while (intersections.hasNext() || bs.hasNext()) {
if (!intersections.hasNext()) {
scan.close();
currentBs = bs.next();
sq = StarQuery.getConstrainedStarQuery(sq, currentBs);
scan = runQuery(sq,emptyRangeSet);
intersections = scan.iterator();
}
if (intersections.hasNext()) {
currentSolutionBs = deserializeKey(intersections.next().getKey(), sq, currentBs,
unCommonVarNames);
} else {
continue;
}
if (sq.commonVarConstant() && currentSolutionBs.size() == unCommonVarNames.size() + sq.getUnCommonVars().size()) {
hasNextCalled = true;
return true;
} else if(currentSolutionBs.size() == unCommonVarNames.size() + sq.getUnCommonVars().size() + 1) {
hasNextCalled = true;
return true;
}
}
isEmpty = true;
return false;
} else {
return !isEmpty;
}
}
@Override
public void close() throws QueryEvaluationException {
scan.close();
}
};
}
}
private QueryBindingSet deserializeKey(final Key key, final StarQuery sq, final BindingSet currentBs, final Set<String> unCommonVar) {
final QueryBindingSet currentSolutionBs = new QueryBindingSet();
final Text row = key.getRow();
final Text cq = key.getColumnQualifier();
final String[] cqArray = cq.toString().split(DocIndexIteratorUtil.DOC_ID_INDEX_DELIM);
boolean commonVarSet = false;
//if common Var is constant there is no common variable to assign a value to
if(sq.commonVarConstant()) {
commonVarSet = true;
}
if (!commonVarSet && sq.isCommonVarURI()) {
final RyaURI rURI = new RyaURI(row.toString());
currentSolutionBs.addBinding(sq.getCommonVarName(),
RyaToRdfConversions.convertValue(rURI));
commonVarSet = true;
}
for (final String s : sq.getUnCommonVars()) {
final byte[] cqBytes = cqArray[sq.getVarPos().get(s)].getBytes(StandardCharsets.UTF_8);
final int firstIndex = Bytes.indexOf(cqBytes, DELIM_BYTE);
final int secondIndex = Bytes.lastIndexOf(cqBytes, DELIM_BYTE);
final int typeIndex = Bytes.indexOf(cqBytes, TYPE_DELIM_BYTE);
final String tripleComponent = new String(Arrays.copyOfRange(cqBytes, firstIndex + 1, secondIndex), StandardCharsets.UTF_8);
final byte[] cqContent = Arrays.copyOfRange(cqBytes, secondIndex + 1, typeIndex);
final byte[] objType = Arrays.copyOfRange(cqBytes, typeIndex, cqBytes.length);
if (tripleComponent.equals("object")) {
final byte[] object = Bytes.concat(cqContent, objType);
org.eclipse.rdf4j.model.Value v = null;
try {
v = RyaToRdfConversions.convertValue(RyaContext.getInstance().deserialize(
object));
} catch (final RyaTypeResolverException e) {
e.printStackTrace();
}
currentSolutionBs.addBinding(s, v);
} else if (tripleComponent.equals("subject")) {
if (!commonVarSet) {
final byte[] object = Bytes.concat(row.getBytes(), objType);
org.eclipse.rdf4j.model.Value v = null;
try {
v = RyaToRdfConversions.convertValue(RyaContext.getInstance().deserialize(
object));
} catch (final RyaTypeResolverException e) {
e.printStackTrace();
}
currentSolutionBs.addBinding(sq.getCommonVarName(), v);
commonVarSet = true;
}
final RyaURI rURI = new RyaURI(new String(cqContent, StandardCharsets.UTF_8));
currentSolutionBs.addBinding(s, RyaToRdfConversions.convertValue(rURI));
} else {
throw new IllegalArgumentException("Invalid row.");
}
}
for (final String s : unCommonVar) {
currentSolutionBs.addBinding(s, currentBs.getValue(s));
}
return currentSolutionBs;
}
private BatchScanner runQuery(final StarQuery query, Collection<Range> ranges) throws QueryEvaluationException {
try {
if (ranges.size() == 0) {
final String rangeText = query.getCommonVarValue();
Range r;
if (rangeText != null) {
r = new Range(new Text(query.getCommonVarValue()));
} else {
r = new Range();
}
ranges = Collections.singleton(r);
}
final Connector accCon = ConfigUtils.getConnector(conf);
final IteratorSetting is = new IteratorSetting(30, "fii", DocumentIndexIntersectingIterator.class);
DocumentIndexIntersectingIterator.setColumnFamilies(is, query.getColumnCond());
if (query.hasContext()) {
DocumentIndexIntersectingIterator.setContext(is, query.getContextURI());
}
final Authorizations auths;
final String authsStr = conf.get(ConfigUtils.CLOUDBASE_AUTHS);
if(authsStr == null || authsStr.isEmpty()) {
auths = new Authorizations();
} else {
auths = new Authorizations(authsStr);
}
bs = accCon.createBatchScanner(EntityCentricIndex.getTableName(conf), auths, 15);
bs.addScanIterator(is);
bs.setRanges(ranges);
return bs;
} catch (TableNotFoundException | AccumuloException | AccumuloSecurityException e) {
throw new QueryEvaluationException(e);
}
}
@Override
public void close() throws IOException {
//TODO generate an exception when BS passed in -- scanner closed
// if (bs != null) {
// bs.close();
// }
}
}