blob: d01c2d7554ab51934b214b4747163b3d94c48de5 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.rya.mongodb;
import static java.util.Objects.requireNonNull;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.commons.io.IOUtils;
import org.apache.log4j.Logger;
import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
import org.apache.rya.api.domain.RyaStatement;
import org.apache.rya.api.domain.RyaURI;
import org.apache.rya.api.persist.RyaDAO;
import org.apache.rya.api.persist.RyaDAOException;
import org.apache.rya.api.persist.RyaNamespaceManager;
import org.apache.rya.api.persist.index.RyaSecondaryIndexer;
import org.apache.rya.api.persist.query.RyaQueryEngine;
import org.apache.rya.mongodb.batch.MongoDbBatchWriter;
import org.apache.rya.mongodb.batch.MongoDbBatchWriterConfig;
import org.apache.rya.mongodb.batch.MongoDbBatchWriterException;
import org.apache.rya.mongodb.batch.MongoDbBatchWriterUtils;
import org.apache.rya.mongodb.batch.collection.DbCollectionType;
import org.apache.rya.mongodb.dao.MongoDBNamespaceManager;
import org.apache.rya.mongodb.dao.MongoDBStorageStrategy;
import org.apache.rya.mongodb.dao.SimpleMongoDBNamespaceManager;
import org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy;
import org.apache.rya.mongodb.document.util.DocumentVisibilityUtil;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBObject;
import com.mongodb.DuplicateKeyException;
import com.mongodb.MongoClient;
/**
* Default DAO for mongo backed RYA allowing for CRUD operations.
*/
public final class MongoDBRyaDAO implements RyaDAO<StatefulMongoDBRdfConfiguration>{
private static final Logger log = Logger.getLogger(MongoDBRyaDAO.class);
private final AtomicBoolean isInitialized = new AtomicBoolean();
private final AtomicBoolean flushEachUpdate = new AtomicBoolean(true);
private StatefulMongoDBRdfConfiguration conf;
private MongoClient mongoClient;
private DB db;
private DBCollection coll;
private MongoDBQueryEngine queryEngine;
private MongoDBStorageStrategy<RyaStatement> storageStrategy;
private MongoDBNamespaceManager nameSpaceManager;
private List<MongoSecondaryIndex> secondaryIndexers;
private Authorizations auths;
private MongoDbBatchWriter<DBObject> mongoDbBatchWriter;
@Override
public synchronized void setConf(final StatefulMongoDBRdfConfiguration conf) {
this.conf = requireNonNull(conf);
mongoClient = this.conf.getMongoClient();
auths = conf.getAuthorizations();
flushEachUpdate.set(conf.flushEachUpdate());
}
public void setDB(final DB db) {
this.db = db;
}
public void setDBCollection(final DBCollection coll) {
this.coll = coll;
}
@Override
public synchronized StatefulMongoDBRdfConfiguration getConf() {
return conf;
}
@Override
public void init() throws RyaDAOException {
if (isInitialized.get()) {
return;
}
secondaryIndexers = conf.getAdditionalIndexers();
for(final MongoSecondaryIndex index: secondaryIndexers) {
index.setConf(conf);
}
db = mongoClient.getDB(conf.get(MongoDBRdfConfiguration.MONGO_DB_NAME));
coll = db.getCollection(conf.getTriplesCollectionName());
nameSpaceManager = new SimpleMongoDBNamespaceManager(db.getCollection(conf.getNameSpacesCollectionName()));
queryEngine = new MongoDBQueryEngine();
queryEngine.setConf(conf);
storageStrategy = new SimpleMongoDBStorageStrategy();
storageStrategy.createIndices(coll);
for(final MongoSecondaryIndex index: secondaryIndexers) {
index.init();
}
final MongoDbBatchWriterConfig mongoDbBatchWriterConfig = MongoDbBatchWriterUtils.getMongoDbBatchWriterConfig(conf);
mongoDbBatchWriter = new MongoDbBatchWriter<>(new DbCollectionType(coll), mongoDbBatchWriterConfig);
try {
mongoDbBatchWriter.start();
} catch (final MongoDbBatchWriterException e) {
throw new RyaDAOException("Error starting MongoDB batch writer", e);
}
isInitialized.set(true);
}
@Override
public boolean isInitialized() throws RyaDAOException {
return isInitialized.get();
}
@Override
public void destroy() throws RyaDAOException {
if (!isInitialized.get()) {
return;
}
isInitialized.set(false);
flush();
try {
mongoDbBatchWriter.shutdown();
} catch (final MongoDbBatchWriterException e) {
throw new RyaDAOException("Error shutting down MongoDB batch writer", e);
}
for(final MongoSecondaryIndex indexer : secondaryIndexers) {
try {
indexer.close();
} catch (final IOException e) {
log.error("Error closing indexer: " + indexer.getClass().getSimpleName(), e);
}
}
IOUtils.closeQuietly(queryEngine);
}
@Override
public void add(final RyaStatement statement) throws RyaDAOException {
// add it to the collection
try {
final boolean canAdd = DocumentVisibilityUtil.doesUserHaveDocumentAccess(auths, statement.getColumnVisibility());
if (canAdd) {
final DBObject obj = storageStrategy.serialize(statement);
try {
mongoDbBatchWriter.addObjectToQueue(obj);
if (flushEachUpdate.get()) {
flush();
}
} catch (final MongoDbBatchWriterException e) {
throw new RyaDAOException("Error adding statement", e);
}
for(final RyaSecondaryIndexer index: secondaryIndexers) {
index.storeStatement(statement);
}
} else {
throw new RyaDAOException("User does not have the required authorizations to add statement");
}
} catch (final IOException e) {
log.error("Unable to add: " + statement.toString());
throw new RyaDAOException(e);
}
catch (final DuplicateKeyException e){
log.error("Attempting to load duplicate triple: " + statement.toString());
}
}
@Override
public void add(final Iterator<RyaStatement> statementIter) throws RyaDAOException {
final List<DBObject> dbInserts = new ArrayList<>();
while (statementIter.hasNext()){
final RyaStatement ryaStatement = statementIter.next();
final boolean canAdd = DocumentVisibilityUtil.doesUserHaveDocumentAccess(auths, ryaStatement.getColumnVisibility());
if (canAdd) {
final DBObject insert = storageStrategy.serialize(ryaStatement);
dbInserts.add(insert);
try {
for (final RyaSecondaryIndexer index : secondaryIndexers) {
index.storeStatement(ryaStatement);
}
} catch (final IOException e) {
log.error("Failed to add: " + ryaStatement.toString() + " to the indexer");
}
} else {
throw new RyaDAOException("User does not have the required authorizations to add statement");
}
}
try {
mongoDbBatchWriter.addObjectsToQueue(dbInserts);
if (flushEachUpdate.get()) {
flush();
}
} catch (final MongoDbBatchWriterException e) {
throw new RyaDAOException("Error adding statements", e);
}
}
@Override
public void delete(final RyaStatement statement, final StatefulMongoDBRdfConfiguration conf)
throws RyaDAOException {
final boolean canDelete = DocumentVisibilityUtil.doesUserHaveDocumentAccess(auths, statement.getColumnVisibility());
if (canDelete) {
final DBObject obj = storageStrategy.getQuery(statement);
coll.remove(obj);
for (final RyaSecondaryIndexer index : secondaryIndexers) {
try {
index.deleteStatement(statement);
} catch (final IOException e) {
log.error("Unable to remove statement: " + statement.toString() + " from secondary indexer: " + index.getTableName(), e);
}
}
} else {
throw new RyaDAOException("User does not have the required authorizations to delete statement");
}
}
@Override
public void dropGraph(final StatefulMongoDBRdfConfiguration conf, final RyaURI... graphs)
throws RyaDAOException {
}
@Override
public void delete(final Iterator<RyaStatement> statements,
final StatefulMongoDBRdfConfiguration conf) throws RyaDAOException {
while (statements.hasNext()){
final RyaStatement ryaStatement = statements.next();
final boolean canDelete = DocumentVisibilityUtil.doesUserHaveDocumentAccess(auths, ryaStatement.getColumnVisibility());
if (canDelete) {
coll.remove(storageStrategy.getQuery(ryaStatement));
for (final RyaSecondaryIndexer index : secondaryIndexers) {
try {
index.deleteStatement(ryaStatement);
} catch (final IOException e) {
log.error("Unable to remove statement: " + ryaStatement.toString() + " from secondary indexer: " + index.getTableName(), e);
}
}
} else {
throw new RyaDAOException("User does not have the required authorizations to delete statement");
}
}
}
@Override
public String getVersion() throws RyaDAOException {
return "1.0";
}
@Override
public RyaQueryEngine<StatefulMongoDBRdfConfiguration> getQueryEngine() {
return queryEngine;
}
@Override
public RyaNamespaceManager<StatefulMongoDBRdfConfiguration> getNamespaceManager() {
return nameSpaceManager;
}
@Override
public void purge(final RdfCloudTripleStoreConfiguration configuration) {
}
@Override
public void dropAndDestroy() throws RyaDAOException {
db.dropDatabase(); // this is dangerous!
}
@Override
public void flush() throws RyaDAOException {
try {
mongoDbBatchWriter.flush();
flushIndexers();
} catch (final MongoDbBatchWriterException e) {
throw new RyaDAOException("Error flushing data.", e);
}
}
private void flushIndexers() throws RyaDAOException {
for (final MongoSecondaryIndex indexer : secondaryIndexers) {
try {
indexer.flush();
} catch (final IOException e) {
log.error("Error flushing data in indexer: " + indexer.getClass().getSimpleName(), e);
}
}
}
}