| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.rya.indexing.pcj.storage.mongo; |
| |
| import static com.google.common.base.Preconditions.checkNotNull; |
| import static java.util.Objects.requireNonNull; |
| |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Set; |
| |
| import org.apache.rya.api.domain.RyaType; |
| import org.apache.rya.api.model.VisibilityBindingSet; |
| import org.apache.rya.api.resolver.RdfToRyaConversions; |
| import org.apache.rya.api.resolver.RyaToRdfConversions; |
| import org.apache.rya.api.utils.CloseableIterator; |
| import org.apache.rya.indexing.pcj.storage.PcjMetadata; |
| import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException; |
| import org.apache.rya.indexing.pcj.storage.accumulo.PcjVarOrderFactory; |
| import org.apache.rya.indexing.pcj.storage.accumulo.ShiftVarOrderFactory; |
| import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; |
| import org.bson.Document; |
| import org.bson.conversions.Bson; |
| import org.openrdf.model.URI; |
| import org.openrdf.model.Value; |
| import org.openrdf.model.impl.URIImpl; |
| import org.openrdf.query.BindingSet; |
| import org.openrdf.query.MalformedQueryException; |
| import org.openrdf.query.QueryEvaluationException; |
| import org.openrdf.query.QueryLanguage; |
| import org.openrdf.query.TupleQuery; |
| import org.openrdf.query.TupleQueryResult; |
| import org.openrdf.query.impl.MapBindingSet; |
| import org.openrdf.repository.RepositoryConnection; |
| import org.openrdf.repository.RepositoryException; |
| |
| import com.mongodb.MongoClient; |
| import com.mongodb.client.FindIterable; |
| import com.mongodb.client.MongoCollection; |
| import com.mongodb.util.JSON; |
| |
| /** |
| * Creates and modifies PCJs in MongoDB. PCJ's are stored as follows: |
| * |
| * <pre> |
| * <code> |
| * ----- PCJ Metadata Doc ----- |
| * { |
| * _id: [pcj_ID]_METADATA, |
| * sparql: [sparql query to match results], |
| * varOrders: [varOrder1, VarOrder2, ..., VarOrdern] |
| * cardinality: [number of results] |
| * } |
| * |
| * ----- PCJ Results Doc ----- |
| * { |
| * pcjId: [pcj_ID], |
| * visibilities: [visibilities] |
| * [binding_var1]: { |
| * uri: [type_uri], |
| * value: [value] |
| * } |
| * . |
| * . |
| * . |
| * [binding_varn]: { |
| * uri: [type_uri], |
| * value: [value] |
| * } |
| * } |
| * </code> |
| * </pre> |
| */ |
| public class MongoPcjDocuments { |
| public static final String PCJ_COLLECTION_NAME = "pcjs"; |
| |
| // metadata fields |
| public static final String CARDINALITY_FIELD = "cardinality"; |
| public static final String SPARQL_FIELD = "sparql"; |
| public static final String PCJ_METADATA_ID = "_id"; |
| public static final String VAR_ORDER_FIELD = "varOrders"; |
| |
| // pcj results fields |
| private static final String BINDING_VALUE = "value"; |
| private static final String BINDING_TYPE = "rdfType"; |
| private static final String VISIBILITIES_FIELD = "visibilities"; |
| private static final String PCJ_ID = "pcjId"; |
| |
| private final MongoCollection<Document> pcjCollection; |
| private static final PcjVarOrderFactory pcjVarOrderFactory = new ShiftVarOrderFactory(); |
| |
| /** |
| * Creates a new {@link MongoPcjDocuments}. |
| * @param client - The {@link MongoClient} to use to connect to mongo. |
| * @param ryaInstanceName - The rya instance to connect to. |
| */ |
| public MongoPcjDocuments(final MongoClient client, final String ryaInstanceName) { |
| requireNonNull(client); |
| requireNonNull(ryaInstanceName); |
| pcjCollection = client.getDatabase(ryaInstanceName).getCollection(PCJ_COLLECTION_NAME); |
| } |
| |
| private String makeMetadataID(final String pcjId) { |
| return pcjId + "_METADATA"; |
| } |
| |
| /** |
| * Creates a {@link Document} containing the metadata defining the PCj. |
| * |
| * @param pcjId - Uniquely identifies a PCJ within Rya. (not null) |
| * @param sparql - The sparql query the PCJ will use. |
| * @return The document built around the provided metadata. |
| * @throws PCJStorageException - Thrown when the sparql query is malformed. |
| */ |
| public Document makeMetadataDocument(final String pcjId, final String sparql) throws PCJStorageException { |
| requireNonNull(pcjId); |
| requireNonNull(sparql); |
| |
| final Set<VariableOrder> varOrders; |
| try { |
| varOrders = pcjVarOrderFactory.makeVarOrders(sparql); |
| } catch (final MalformedQueryException e) { |
| throw new PCJStorageException("Can not create the PCJ. The SPARQL is malformed.", e); |
| } |
| |
| return new Document() |
| .append(PCJ_METADATA_ID, makeMetadataID(pcjId)) |
| .append(SPARQL_FIELD, sparql) |
| .append(CARDINALITY_FIELD, 0) |
| .append(VAR_ORDER_FIELD, varOrders); |
| |
| } |
| |
| /** |
| * Creates a new PCJ based on the provided metadata. The initial pcj results |
| * will be empty. |
| * |
| * @param pcjId - Uniquely identifies a PCJ within Rya. |
| * @param sparql - The query the pcj is assigned to. |
| * @throws PCJStorageException - Thrown when the sparql query is malformed. |
| */ |
| public void createPcj(final String pcjId, final String sparql) throws PCJStorageException { |
| pcjCollection.insertOne(makeMetadataDocument(pcjId, sparql)); |
| } |
| |
| /** |
| * Creates a new PCJ document and populates it by scanning an instance of |
| * Rya for historic matches. |
| * <p> |
| * If any portion of this operation fails along the way, the partially |
| * create PCJ documents will be left in Mongo. |
| * |
| * @param ryaConn - Connects to the Rya that will be scanned. (not null) |
| * @param pcjId - Uniquely identifies a PCJ within Rya. (not null) |
| * @param sparql - The SPARQL query whose results will be loaded into the PCJ results document. (not null) |
| * @throws PCJStorageException The PCJ documents could not be create or the |
| * values from Rya were not able to be loaded into it. |
| */ |
| public void createAndPopulatePcj( |
| final RepositoryConnection ryaConn, |
| final String pcjId, |
| final String sparql) throws PCJStorageException { |
| checkNotNull(ryaConn); |
| checkNotNull(pcjId); |
| checkNotNull(sparql); |
| |
| // Create the PCJ document in Mongo. |
| createPcj(pcjId, sparql); |
| |
| // Load historic matches from Rya into the PCJ results document. |
| populatePcj(pcjId, ryaConn); |
| } |
| |
| /** |
| * Gets the {@link PcjMetadata} from a provided PCJ Id. |
| * |
| * @param pcjId - The Id of the PCJ to get from MongoDB. (not null) |
| * @return - The {@link PcjMetadata} of the Pcj specified. |
| * @throws PCJStorageException The PCJ metadata document does not exist. |
| */ |
| public PcjMetadata getPcjMetadata(final String pcjId) throws PCJStorageException { |
| requireNonNull(pcjId); |
| |
| // since query by ID, there will only be one. |
| final Document result = pcjCollection.find(new Document(PCJ_METADATA_ID, makeMetadataID(pcjId))).first(); |
| |
| if(result == null) { |
| throw new PCJStorageException("The PCJ: " + pcjId + " does not exist."); |
| } |
| |
| final String sparql = result.getString(SPARQL_FIELD); |
| final int cardinality = result.getInteger(CARDINALITY_FIELD, 0); |
| final List<List<String>> varOrders= (List<List<String>>) result.get(VAR_ORDER_FIELD); |
| final Set<VariableOrder> varOrder = new HashSet<>(); |
| for(final List<String> vars : varOrders) { |
| varOrder.add(new VariableOrder(vars)); |
| } |
| |
| return new PcjMetadata(sparql, cardinality, varOrder); |
| } |
| |
| /** |
| * Adds binding set results to a specific PCJ. |
| * |
| * @param pcjId - Uniquely identifies a PCJ within Rya. (not null) |
| * @param results - The binding set results. (not null) |
| */ |
| public void addResults(final String pcjId, final Collection<VisibilityBindingSet> results) { |
| checkNotNull(pcjId); |
| checkNotNull(results); |
| |
| final List<Document> pcjDocs = new ArrayList<>(); |
| for (final VisibilityBindingSet vbs : results) { |
| // each binding gets it's own doc. |
| final Document bindingDoc = new Document(PCJ_ID, pcjId); |
| vbs.forEach(binding -> { |
| final RyaType type = RdfToRyaConversions.convertValue(binding.getValue()); |
| bindingDoc.append(binding.getName(), |
| new Document() |
| .append(BINDING_TYPE, type.getDataType().stringValue()) |
| .append(BINDING_VALUE, type.getData()) |
| ); |
| }); |
| bindingDoc.append(VISIBILITIES_FIELD, vbs.getVisibility()); |
| pcjDocs.add(bindingDoc); |
| } |
| pcjCollection.insertMany(pcjDocs); |
| |
| // update cardinality in the metadata doc. |
| final int appendCardinality = pcjDocs.size(); |
| final Bson query = new Document(PCJ_METADATA_ID, makeMetadataID(pcjId)); |
| final Bson update = new Document("$inc", new Document(CARDINALITY_FIELD, appendCardinality)); |
| pcjCollection.updateOne(query, update); |
| } |
| |
| /** |
| * Purges all results from the PCJ results document with the provided Id. |
| * |
| * @param pcjId - The Id of the PCJ to purge. (not null) |
| */ |
| public void purgePcjs(final String pcjId) { |
| requireNonNull(pcjId); |
| |
| // remove every doc for the pcj, except the metadata |
| final Bson filter = new Document(PCJ_ID, pcjId); |
| pcjCollection.deleteMany(filter); |
| |
| // reset cardinality |
| final Bson query = new Document(PCJ_METADATA_ID, makeMetadataID(pcjId)); |
| final Bson update = new Document("$set", new Document(CARDINALITY_FIELD, 0)); |
| pcjCollection.updateOne(query, update); |
| } |
| |
| /** |
| * Scan Rya for results that solve the PCJ's query and store them in the PCJ |
| * document. |
| * <p> |
| * This method assumes the PCJ document has already been created. |
| * |
| * @param pcjId - The Id of the PCJ that will receive the results. (not null) |
| * @param ryaConn - A connection to the Rya store that will be queried to find results. (not null) |
| * @throws PCJStorageException If results could not be written to the PCJ results document, |
| * the PCJ results document does not exist, or the query that is being execute was malformed. |
| */ |
| public void populatePcj(final String pcjId, final RepositoryConnection ryaConn) throws PCJStorageException { |
| checkNotNull(pcjId); |
| checkNotNull(ryaConn); |
| |
| try { |
| // Fetch the query that needs to be executed from the PCJ metadata document. |
| final PcjMetadata pcjMetadata = getPcjMetadata(pcjId); |
| final String sparql = pcjMetadata.getSparql(); |
| |
| // Query Rya for results to the SPARQL query. |
| final TupleQuery query = ryaConn.prepareTupleQuery(QueryLanguage.SPARQL, sparql); |
| final TupleQueryResult results = query.evaluate(); |
| |
| // Load batches of 1000 of them at a time into the PCJ results document. |
| final Set<VisibilityBindingSet> batch = new HashSet<>(1000); |
| while(results.hasNext()) { |
| final VisibilityBindingSet bs = new VisibilityBindingSet(results.next()); |
| batch.add( bs ); |
| if(batch.size() == 1000) { |
| addResults(pcjId, batch); |
| batch.clear(); |
| } |
| } |
| |
| if(!batch.isEmpty()) { |
| addResults(pcjId, batch); |
| } |
| |
| } catch (RepositoryException | MalformedQueryException | QueryEvaluationException e) { |
| throw new PCJStorageException( |
| "Could not populate a PCJ document with Rya results for the pcj with Id: " + pcjId, e); |
| } |
| } |
| |
| /** |
| * List the document Ids of the PCJs that are stored in MongoDB |
| * for this instance of Rya. |
| * |
| * @return A list of pcj document Ids that hold PCJ index data for the current |
| * instance of Rya. |
| */ |
| public List<String> listPcjDocuments() { |
| final List<String> pcjIds = new ArrayList<>(); |
| |
| //This Bson string reads as: |
| //{} - no search criteria: find all |
| //{ _id: 1 } - only return the _id, which is the PCJ Id. |
| final FindIterable<Document> rez = pcjCollection.find((Bson) JSON.parse("{ }, { " + PCJ_METADATA_ID + ": 1 , _id: 0}")); |
| final Iterator<Document> iter = rez.iterator(); |
| while(iter.hasNext()) { |
| pcjIds.add(iter.next().get(PCJ_METADATA_ID).toString().replace("_METADATA", "")); |
| } |
| |
| return pcjIds; |
| } |
| |
| /** |
| * Returns all of the results of a PCJ. |
| * |
| * @param pcjId |
| * - The PCJ to get the results for. (not null) |
| * @return The authorized PCJ results. |
| */ |
| public CloseableIterator<BindingSet> listResults(final String pcjId) { |
| requireNonNull(pcjId); |
| |
| // get all results based on pcjId |
| return queryForBindings(new Document(PCJ_ID, pcjId)); |
| } |
| |
| /** |
| * Retrieves the stored {@link BindingSet} results for the provided pcjId. |
| * |
| * @param pcjId - The Id of the PCJ to retrieve results from. |
| * @param restrictionBindings - The collection of {@link BindingSet}s to restrict results. |
| * <p> |
| * Note: the result restrictions from {@link BindingSet}s are an OR |
| * over ANDS in that: <code> |
| * [ |
| * bindingset: binding AND binding AND binding, |
| * OR |
| * bindingset: binding AND binding AND binding, |
| * . |
| * . |
| * . |
| * OR |
| * bindingset: binding |
| * ] |
| * </code> |
| * @return |
| */ |
| public CloseableIterator<BindingSet> getResults(final String pcjId, final Collection<BindingSet> restrictionBindings) { |
| // empty bindings return all results. |
| if (restrictionBindings.size() == 1 && restrictionBindings.iterator().next().size() == 0) { |
| return listResults(pcjId); |
| } |
| |
| final Document query = new Document(PCJ_ID, pcjId); |
| final Document bindingSetDoc = new Document(); |
| final List<Document> bindingSetList = new ArrayList<>(); |
| restrictionBindings.forEach(bindingSet -> { |
| final Document bindingDoc = new Document(); |
| final List<Document> bindings = new ArrayList<>(); |
| bindingSet.forEach(binding -> { |
| final RyaType type = RdfToRyaConversions.convertValue(binding.getValue()); |
| final Document typeDoc = new Document() |
| .append(BINDING_TYPE, type.getDataType().stringValue()) |
| .append(BINDING_VALUE, type.getData()); |
| final Document bind = new Document(binding.getName(), typeDoc); |
| bindings.add(bind); |
| }); |
| bindingDoc.append("$and", bindings); |
| bindingSetList.add(bindingDoc); |
| }); |
| bindingSetDoc.append("$or", bindingSetList); |
| return queryForBindings(query); |
| } |
| |
| private CloseableIterator<BindingSet> queryForBindings(final Document query) { |
| final FindIterable<Document> rez = pcjCollection.find(query); |
| final Iterator<Document> resultsIter = rez.iterator(); |
| return new CloseableIterator<BindingSet>() { |
| @Override |
| public boolean hasNext() { |
| return resultsIter.hasNext(); |
| } |
| |
| @Override |
| public BindingSet next() { |
| final Document bs = resultsIter.next(); |
| final MapBindingSet binding = new MapBindingSet(); |
| for (final String key : bs.keySet()) { |
| if (key.equals(VISIBILITIES_FIELD)) { |
| // has auths, is a visibility binding set. |
| } else if (!key.equals("_id") && !key.equals(PCJ_ID)) { |
| // is the binding value. |
| final Document typeDoc = (Document) bs.get(key); |
| final URI dataType = new URIImpl(typeDoc.getString(BINDING_TYPE)); |
| final RyaType type = new RyaType(dataType, typeDoc.getString(BINDING_VALUE)); |
| final Value value = RyaToRdfConversions.convertValue(type); |
| binding.addBinding(key, value); |
| } |
| } |
| return binding; |
| } |
| |
| @Override |
| public void close() throws Exception { |
| } |
| }; |
| } |
| |
| /** |
| * Drops a pcj based on the PCJ Id. Removing the entire document from Mongo. |
| * |
| * @param pcjId - The identifier for the PCJ to remove. |
| */ |
| public void dropPcj(final String pcjId) { |
| purgePcjs(pcjId); |
| pcjCollection.deleteOne(new Document(PCJ_METADATA_ID, makeMetadataID(pcjId))); |
| } |
| } |