blob: 2428e289e7bc9e9c7fd0d69796eb01f260a15962 [file] [log] [blame]
package org.apache.rya.indexing.mongodb;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.io.IOException;
import java.util.Collection;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;
import org.apache.rya.api.domain.RyaStatement;
import org.apache.rya.api.domain.RyaURI;
import org.apache.rya.api.resolver.RyaToRdfConversions;
import org.apache.rya.indexing.StatementConstraints;
import org.apache.rya.mongodb.MongoConnectorFactory;
import org.apache.rya.mongodb.MongoDBRdfConfiguration;
import org.apache.rya.mongodb.MongoDBRyaDAO;
import org.apache.rya.mongodb.MongoSecondaryIndex;
import org.openrdf.model.Literal;
import org.openrdf.model.Statement;
import org.openrdf.model.URI;
import org.openrdf.query.QueryEvaluationException;
import com.google.common.annotations.VisibleForTesting;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBCursor;
import com.mongodb.DBObject;
import com.mongodb.MongoClient;
import com.mongodb.QueryBuilder;
import com.mongodb.ServerAddress;
import com.mongodb.WriteConcern;
import info.aduna.iteration.CloseableIteration;
/**
* Secondary Indexer using MondoDB
* @param <T> - The {@link AbstractMongoIndexingStorageStrategy} this indexer uses.
*/
public abstract class AbstractMongoIndexer<T extends IndexingMongoDBStorageStrategy> implements MongoSecondaryIndex {
private static final Logger LOG = Logger.getLogger(AbstractMongoIndexer.class);
private boolean isInit = false;
protected Configuration conf;
protected MongoDBRyaDAO dao;
protected MongoClient mongoClient;
protected String dbName;
protected DB db;
protected DBCollection collection;
protected Set<URI> predicates;
protected T storageStrategy;
protected void initCore() {
dbName = conf.get(MongoDBRdfConfiguration.MONGO_DB_NAME);
db = this.mongoClient.getDB(dbName);
collection = db.getCollection(conf.get(MongoDBRdfConfiguration.MONGO_COLLECTION_PREFIX, "rya") + getCollectionName());
}
@Override
public void setClient(final MongoClient client){
this.mongoClient = client;
}
@VisibleForTesting
public void initIndexer(final Configuration conf, final MongoClient client) {
setClient(client);
final ServerAddress address = client.getAddress();
conf.set(MongoDBRdfConfiguration.MONGO_INSTANCE, address.getHost());
conf.set(MongoDBRdfConfiguration.MONGO_INSTANCE_PORT, Integer.toString(address.getPort()));
setConf(conf);
if (!isInit) {
init();
isInit = true;
}
}
@Override
public void setConf(final Configuration conf) {
this.conf = conf;
if (!isInit){
setClient(MongoConnectorFactory.getMongoClient(conf));
init();
}
}
@Override
public void close() throws IOException {
mongoClient.close();
}
@Override
public void flush() throws IOException {
}
@Override
public Configuration getConf() {
return conf;
}
@Override
public String getTableName() {
return dbName;
}
@Override
public Set<URI> getIndexablePredicates() {
return predicates;
}
@Override
public void deleteStatement(final RyaStatement stmt) throws IOException {
final DBObject obj = storageStrategy.getQuery(stmt);
collection.remove(obj);
}
@Override
public void storeStatements(final Collection<RyaStatement> ryaStatements)
throws IOException {
for (final RyaStatement ryaStatement : ryaStatements){
storeStatement(ryaStatement);
}
}
@Override
public void storeStatement(final RyaStatement ryaStatement) throws IOException {
try {
final Statement statement = RyaToRdfConversions.convertStatement(ryaStatement);
final boolean isValidPredicate = predicates.isEmpty() || predicates.contains(statement.getPredicate());
if (isValidPredicate && (statement.getObject() instanceof Literal)) {
final DBObject obj = storageStrategy.serialize(ryaStatement);
if (obj != null) {
collection.insert(obj, WriteConcern.ACKNOWLEDGED);
}
}
} catch (final IllegalArgumentException e) {
LOG.error("Unable to parse the statement: " + ryaStatement.toString());
}
}
@Override
public void dropGraph(final RyaURI... graphs) {
throw new UnsupportedOperationException();
}
protected CloseableIteration<Statement, QueryEvaluationException> withConstraints(final StatementConstraints constraints, final DBObject preConstraints) {
final DBObject dbo = QueryBuilder.start().and(preConstraints).and(storageStrategy.getQuery(constraints)).get();
return closableIterationFromCursor(dbo);
}
private CloseableIteration<Statement, QueryEvaluationException> closableIterationFromCursor(final DBObject dbo) {
final DBCursor cursor = collection.find(dbo);
return new CloseableIteration<Statement, QueryEvaluationException>() {
@Override
public boolean hasNext() {
return cursor.hasNext();
}
@Override
public Statement next() throws QueryEvaluationException {
final DBObject dbo = cursor.next();
return RyaToRdfConversions.convertStatement(storageStrategy.deserializeDBObject(dbo));
}
@Override
public void remove() {
throw new UnsupportedOperationException("Remove not implemented");
}
@Override
public void close() throws QueryEvaluationException {
cursor.close();
}
};
}
/**
* @return The name of the {@link DBCollection} to use with the storage strategy.
*/
public abstract String getCollectionName();
}