blob: 9ddb15a7975e21115ed2f4efb6f72446be018249 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.rya.mongodb;
import static com.google.common.base.Preconditions.checkNotNull;
import java.io.IOException;
import java.util.AbstractMap;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.rya.api.domain.RyaStatement;
import org.apache.rya.api.persist.RyaDAOException;
import org.apache.rya.api.persist.query.BatchRyaQuery;
import org.apache.rya.api.persist.query.RyaQuery;
import org.apache.rya.api.persist.query.RyaQueryEngine;
import org.apache.rya.mongodb.dao.MongoDBStorageStrategy;
import org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy;
import org.apache.rya.mongodb.iter.RyaStatementBindingSetCursorIterator;
import org.apache.rya.mongodb.iter.RyaStatementCursorIterator;
import org.bson.Document;
import org.calrissian.mango.collect.CloseableIterable;
import org.calrissian.mango.collect.CloseableIterables;
import org.openrdf.query.BindingSet;
import org.openrdf.query.impl.MapBindingSet;
import com.google.common.base.Preconditions;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import info.aduna.iteration.CloseableIteration;
/**
* Date: 7/17/12
* Time: 9:28 AM
*/
public class MongoDBQueryEngine implements RyaQueryEngine<StatefulMongoDBRdfConfiguration> {
private StatefulMongoDBRdfConfiguration configuration;
private final MongoDBStorageStrategy<RyaStatement> strategy = new SimpleMongoDBStorageStrategy();
@Override
public void setConf(final StatefulMongoDBRdfConfiguration conf) {
configuration = conf;
}
@Override
public StatefulMongoDBRdfConfiguration getConf() {
return configuration;
}
@Override
public CloseableIteration<RyaStatement, RyaDAOException> query(
final RyaStatement stmt, final StatefulMongoDBRdfConfiguration conf)
throws RyaDAOException {
checkNotNull(stmt);
checkNotNull(conf);
Entry<RyaStatement, BindingSet> entry = new AbstractMap.SimpleEntry<>(stmt, new MapBindingSet());
Collection<Entry<RyaStatement, BindingSet>> collection = Collections.singleton(entry);
return new RyaStatementCursorIterator(queryWithBindingSet(collection, conf));
}
@Override
public CloseableIteration<? extends Entry<RyaStatement, BindingSet>, RyaDAOException> queryWithBindingSet(
final Collection<Entry<RyaStatement, BindingSet>> stmts,
final StatefulMongoDBRdfConfiguration conf) throws RyaDAOException {
checkNotNull(stmts);
checkNotNull(conf);
final Multimap<RyaStatement, BindingSet> rangeMap = HashMultimap.create();
//TODO: cannot span multiple tables here
try {
for (final Map.Entry<RyaStatement, BindingSet> stmtbs : stmts) {
final RyaStatement stmt = stmtbs.getKey();
final BindingSet bs = stmtbs.getValue();
rangeMap.put(stmt, bs);
}
// TODO not sure what to do about regex ranges?
final RyaStatementBindingSetCursorIterator iterator = new RyaStatementBindingSetCursorIterator(
getCollection(conf), rangeMap, strategy, conf.getAuthorizations());
return iterator;
} catch (final Exception e) {
throw new RyaDAOException(e);
}
}
@Override
public CloseableIteration<RyaStatement, RyaDAOException> batchQuery(
final Collection<RyaStatement> stmts, final StatefulMongoDBRdfConfiguration conf)
throws RyaDAOException {
final Map<RyaStatement, BindingSet> queries = new HashMap<>();
for (final RyaStatement stmt : stmts) {
queries.put(stmt, new MapBindingSet());
}
return new RyaStatementCursorIterator(queryWithBindingSet(queries.entrySet(), conf));
}
@Override
public CloseableIterable<RyaStatement> query(final RyaQuery ryaQuery)
throws RyaDAOException {
Preconditions.checkNotNull(ryaQuery);
return query(new BatchRyaQuery(Collections.singleton(ryaQuery.getQuery())));
}
@Override
public CloseableIterable<RyaStatement> query(final BatchRyaQuery batchRyaQuery)
throws RyaDAOException {
Preconditions.checkNotNull(batchRyaQuery);
final Map<RyaStatement, BindingSet> queries = new HashMap<>();
for (final RyaStatement stmt : batchRyaQuery.getQueries()) {
queries.put(stmt, new MapBindingSet());
}
Iterator<RyaStatement> iterator = new RyaStatementCursorIterator(queryWithBindingSet(queries.entrySet(), getConf()));
return CloseableIterables.wrap((Iterable<RyaStatement>) () -> iterator);
}
private MongoCollection<Document> getCollection(final StatefulMongoDBRdfConfiguration conf) {
final MongoDatabase db = conf.getMongoClient().getDatabase(conf.getMongoDBName());
return db.getCollection(conf.getTriplesCollectionName());
}
@Override
public void close() throws IOException {
}
}