blob: 468e58190fd3fc1730a4a7e2f76e4f337cc34bbd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.rya.export.mongo;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.TIMESTAMP;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import org.apache.rya.api.domain.RyaStatement;
import org.apache.rya.api.persist.RyaDAOException;
import org.apache.rya.export.api.metadata.MergeParentMetadata;
import org.apache.rya.export.api.metadata.ParentMetadataExistsException;
import org.apache.rya.export.api.store.AddStatementException;
import org.apache.rya.export.api.store.ContainsStatementException;
import org.apache.rya.export.api.store.RemoveStatementException;
import org.apache.rya.export.api.store.RyaStatementStore;
import org.apache.rya.export.api.store.UpdateStatementException;
import org.apache.rya.export.mongo.parent.MongoParentMetadataRepository;
import org.apache.rya.mongodb.MongoDBRyaDAO;
import org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.mongodb.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
/**
* Mongo implementation of {@link RyaStatementStore}. Allows for exporting and
* importing rya statements from MongoDB.
*/
public class MongoRyaStatementStore implements RyaStatementStore {
private static final Logger logger = LoggerFactory.getLogger(MongoRyaStatementStore.class);
public static final String TRIPLES_COLLECTION = "rya__triples";
public static final String METADATA_COLLECTION = "parent_metadata";
protected final SimpleMongoDBStorageStrategy adapter;
protected final MongoDatabase db;
private final String ryaInstanceName;
private final MongoClient client;
private final MongoDBRyaDAO dao;
private final MongoParentMetadataRepository parentMetadataRepo;
/**
* Creates a new {@link MongoRyaStatementStore}.
* @param client - The client to connect to Mongo. (not null)
* @param ryaInstance - The rya instance to connect to. (not null)
* @param dao - The {@link MongoDBRyaDAO} to use to access statements. (not null)
*/
public MongoRyaStatementStore(final MongoClient client, final String ryaInstance, final MongoDBRyaDAO dao) {
this.client = checkNotNull(client);
ryaInstanceName = checkNotNull(ryaInstance);
this.dao = checkNotNull(dao);
db = this.client.getDatabase(ryaInstanceName);
adapter = new SimpleMongoDBStorageStrategy();
parentMetadataRepo = new MongoParentMetadataRepository(client, ryaInstance);
}
@Override
public Iterator<RyaStatement> fetchStatements() {
final Document sortObj = new Document(TIMESTAMP, 1);
final MongoCollection<Document> coll = db.getCollection(TRIPLES_COLLECTION);
final List<RyaStatement> statements = new ArrayList<>();
try (final MongoCursor<Document> cur = coll.find().sort(sortObj).iterator()) {
while(cur.hasNext()) {
final RyaStatement statement = adapter.deserializeDocument(cur.next());
statements.add(statement);
}
}
return statements.iterator();
}
@Override
public void addStatement(final RyaStatement statement) throws AddStatementException {
try {
dao.add(statement);
} catch (final RyaDAOException e) {
throw new AddStatementException("Unable to add statement: '" + statement.toString() + "'", e);
}
}
@Override
public void removeStatement(final RyaStatement statement) throws RemoveStatementException {
try {
//mongo dao does not need a config to remove.
dao.delete(statement, null);
} catch (final RyaDAOException e) {
throw new RemoveStatementException("Unable to remove statement: '" + statement.toString() + "'", e);
}
}
@Override
public boolean containsStatement(final RyaStatement statement) throws ContainsStatementException {
final Document doc = adapter.serialize(statement);
return db.getCollection(TRIPLES_COLLECTION).countDocuments(doc) > 0;
}
/**
* @return The {@link MongoClient} to connect to mongo.
*/
public MongoClient getClient() {
return client;
}
@Override
public void updateStatement(final RyaStatement original, final RyaStatement update) throws UpdateStatementException {
//Since mongo does not support visibility, this does nothing for mongo.
//Do not want a throw a not-implemented exception since that could potentially
//break stuff.
}
@Override
public Optional<MergeParentMetadata> getParentMetadata() {
MergeParentMetadata metadata = null;
try {
metadata = parentMetadataRepo.get();
} catch (final Exception e) {
// Catching any exception to ensure we always return Optional.ofNullable(metadata).
// Logging at the debug level if exceptional behavior needs to be investigated while deployed.
logger.debug("Parent metadata missing or exceptional behavior occurred.", e);
}
return Optional.ofNullable(metadata);
}
@Override
public void setParentMetadata(final MergeParentMetadata metadata) throws ParentMetadataExistsException {
parentMetadataRepo.set(metadata);
}
}