blob: d66d1b62ef10f4315232bfdec1c082ad0e8b3971 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.rya.mongodb.iter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.log4j.Logger;
import org.apache.rya.api.RdfCloudTripleStoreUtils;
import org.apache.rya.api.domain.RyaStatement;
import org.apache.rya.api.domain.RyaType;
import org.apache.rya.api.persist.RyaDAOException;
import org.apache.rya.mongodb.dao.MongoDBStorageStrategy;
import org.apache.rya.mongodb.document.operators.aggregation.AggregationUtil;
import org.bson.Document;
import org.openrdf.query.BindingSet;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import com.mongodb.DBObject;
import com.mongodb.client.AggregateIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.util.JSON;
import info.aduna.iteration.CloseableIteration;
public class RyaStatementBindingSetCursorIterator implements CloseableIteration<Entry<RyaStatement, BindingSet>, RyaDAOException> {
private static final Logger log = Logger.getLogger(RyaStatementBindingSetCursorIterator.class);
private static final int QUERY_BATCH_SIZE = 50;
private final MongoCollection<Document> coll;
private final Multimap<RyaStatement, BindingSet> rangeMap;
private final Multimap<RyaStatement, BindingSet> executedRangeMap = HashMultimap.create();
private final Iterator<RyaStatement> queryIterator;
private Iterator<Document> batchQueryResultsIterator;
private RyaStatement currentResultStatement;
private Iterator<BindingSet> currentBindingSetIterator;
private final MongoDBStorageStrategy<RyaStatement> strategy;
private final Authorizations auths;
public RyaStatementBindingSetCursorIterator(final MongoCollection<Document> coll,
final Multimap<RyaStatement, BindingSet> rangeMap, final MongoDBStorageStrategy<RyaStatement> strategy,
final Authorizations auths) {
this.coll = coll;
this.rangeMap = rangeMap;
queryIterator = rangeMap.keySet().iterator();
this.strategy = strategy;
this.auths = auths;
}
@Override
public boolean hasNext() {
if (!currentBindingSetIteratorIsValid()) {
findNextResult();
}
return currentBindingSetIteratorIsValid();
}
@Override
public Entry<RyaStatement, BindingSet> next() {
if (!currentBindingSetIteratorIsValid()) {
findNextResult();
}
if (currentBindingSetIteratorIsValid()) {
final BindingSet currentBindingSet = currentBindingSetIterator.next();
return new RdfCloudTripleStoreUtils.CustomEntry<RyaStatement, BindingSet>(currentResultStatement, currentBindingSet);
}
return null;
}
private boolean currentBindingSetIteratorIsValid() {
return currentBindingSetIterator != null && currentBindingSetIterator.hasNext();
}
private void findNextResult() {
if (!currentBatchQueryResultCursorIsValid()) {
submitBatchQuery();
}
if (currentBatchQueryResultCursorIsValid()) {
// convert to Rya Statement
final Document queryResult = batchQueryResultsIterator.next();
final DBObject dbo = (DBObject) JSON.parse(queryResult.toJson());
currentResultStatement = strategy.deserializeDBObject(dbo);
// Find all of the queries in the executed RangeMap that this result matches
// and collect all of those binding sets
final Set<BindingSet> bsList = new HashSet<>();
for (final RyaStatement executedQuery : executedRangeMap.keys()) {
if (isResultForQuery(executedQuery, currentResultStatement)) {
bsList.addAll(executedRangeMap.get(executedQuery));
}
}
currentBindingSetIterator = bsList.iterator();
}
// Handle case of invalid currentResultStatement or no binding sets returned
if ((currentBindingSetIterator == null || !currentBindingSetIterator.hasNext()) && (currentBatchQueryResultCursorIsValid() || queryIterator.hasNext())) {
findNextResult();
}
}
private static boolean isResultForQuery(final RyaStatement query, final RyaStatement result) {
return isResult(query.getSubject(), result.getSubject()) &&
isResult(query.getPredicate(), result.getPredicate()) &&
isResult(query.getObject(), result.getObject()) &&
isResult(query.getContext(), result.getContext());
}
private static boolean isResult(final RyaType query, final RyaType result) {
return query == null || query.equals(result);
}
private void submitBatchQuery() {
int count = 0;
executedRangeMap.clear();
final List<Document> pipeline = new ArrayList<>();
final List<DBObject> match = new ArrayList<>();
while (queryIterator.hasNext() && count < QUERY_BATCH_SIZE){
count++;
final RyaStatement query = queryIterator.next();
executedRangeMap.putAll(query, rangeMap.get(query));
final DBObject currentQuery = strategy.getQuery(query);
match.add(currentQuery);
}
if (match.size() > 1) {
pipeline.add(new Document("$match", new Document("$or", match)));
} else if (match.size() == 1) {
pipeline.add(new Document("$match", match.get(0)));
} else {
batchQueryResultsIterator = Collections.emptyIterator();
return;
}
// Executing redact aggregation to only return documents the user has access to.
pipeline.addAll(AggregationUtil.createRedactPipeline(auths));
log.trace(pipeline);
final AggregateIterable<Document> aggIter = coll.aggregate(pipeline);
aggIter.batchSize(1000);
batchQueryResultsIterator = aggIter.iterator();
}
private boolean currentBatchQueryResultCursorIsValid() {
return batchQueryResultsIterator != null && batchQueryResultsIterator.hasNext();
}
@Override
public void close() throws RyaDAOException {
// TODO don't know what to do here
}
@Override
public void remove() throws RyaDAOException {
next();
}
}