| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.rya.mongodb.aggregation; |
| |
| import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.CONTEXT; |
| import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.DOCUMENT_VISIBILITY; |
| import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.OBJECT; |
| import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.OBJECT_HASH; |
| import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.OBJECT_LANGUAGE; |
| import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.OBJECT_TYPE; |
| import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.PREDICATE; |
| import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.PREDICATE_HASH; |
| import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.STATEMENT_METADATA; |
| import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.SUBJECT; |
| import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.SUBJECT_HASH; |
| import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.TIMESTAMP; |
| |
| import java.util.Arrays; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.NavigableSet; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.concurrent.ConcurrentSkipListSet; |
| import java.util.function.Function; |
| |
| import org.apache.rya.api.domain.RyaIRI; |
| import org.apache.rya.api.domain.RyaStatement; |
| import org.apache.rya.api.domain.RyaType; |
| import org.apache.rya.api.domain.StatementMetadata; |
| import org.apache.rya.api.resolver.RdfToRyaConversions; |
| import org.apache.rya.mongodb.MongoDbRdfConstants; |
| import org.apache.rya.mongodb.dao.MongoDBStorageStrategy; |
| import org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy; |
| import org.apache.rya.mongodb.document.operators.query.ConditionalOperators; |
| import org.apache.rya.mongodb.document.visibility.DocumentVisibilityAdapter; |
| import org.bson.Document; |
| import org.bson.conversions.Bson; |
| import org.eclipse.rdf4j.common.iteration.CloseableIteration; |
| import org.eclipse.rdf4j.model.IRI; |
| import org.eclipse.rdf4j.model.Literal; |
| import org.eclipse.rdf4j.model.Resource; |
| import org.eclipse.rdf4j.model.Value; |
| import org.eclipse.rdf4j.model.vocabulary.XMLSchema; |
| import org.eclipse.rdf4j.query.BindingSet; |
| import org.eclipse.rdf4j.query.QueryEvaluationException; |
| import org.eclipse.rdf4j.query.algebra.Compare; |
| import org.eclipse.rdf4j.query.algebra.ExtensionElem; |
| import org.eclipse.rdf4j.query.algebra.ProjectionElem; |
| import org.eclipse.rdf4j.query.algebra.ProjectionElemList; |
| import org.eclipse.rdf4j.query.algebra.StatementPattern; |
| import org.eclipse.rdf4j.query.algebra.ValueConstant; |
| import org.eclipse.rdf4j.query.algebra.ValueExpr; |
| import org.eclipse.rdf4j.query.algebra.Var; |
| import org.eclipse.rdf4j.query.algebra.evaluation.impl.ExternalSet; |
| |
| import com.google.common.base.Objects; |
| import com.google.common.base.Preconditions; |
| import com.google.common.collect.BiMap; |
| import com.google.common.collect.HashBiMap; |
| import com.mongodb.BasicDBObject; |
| import com.mongodb.DBObject; |
| import com.mongodb.client.MongoCollection; |
| import com.mongodb.client.model.Aggregates; |
| import com.mongodb.client.model.BsonField; |
| import com.mongodb.client.model.Filters; |
| import com.mongodb.client.model.Projections; |
| |
| /** |
| * Represents a portion of a query tree as MongoDB aggregation pipeline. Should |
| * be built bottom-up: start with a statement pattern implemented as a $match |
| * step, then add steps to the pipeline to handle higher levels of the query |
| * tree. Methods are provided to add certain supported query operations to the |
| * end of the internal pipeline. In some cases, specific arguments may be |
| * unsupported, in which case the pipeline is unchanged and the method returns |
| * false. |
| */ |
| public class AggregationPipelineQueryNode extends ExternalSet { |
| /** |
| * An aggregation result corresponding to a solution should map this key |
| * to an object which itself maps variable names to variable values. |
| */ |
| static final String VALUES = "<VALUES>"; |
| |
| /** |
| * An aggregation result corresponding to a solution should map this key |
| * to an object which itself maps variable names to the corresponding hashes |
| * of their values. |
| */ |
| static final String HASHES = "<HASHES>"; |
| |
| /** |
| * An aggregation result corresponding to a solution should map this key |
| * to an object which itself maps variable names to their datatypes, if any. |
| */ |
| static final String TYPES = "<TYPES>"; |
| |
| private static final String LEVEL = "derivation_level"; |
| private static final String[] FIELDS = { VALUES, HASHES, TYPES, LEVEL, TIMESTAMP }; |
| |
| private static final String JOINED_TRIPLE = "<JOINED_TRIPLE>"; |
| private static final String FIELDS_MATCH = "<JOIN_FIELDS_MATCH>"; |
| |
| private static final MongoDBStorageStrategy<RyaStatement> strategy = new SimpleMongoDBStorageStrategy(); |
| |
| private static final Bson DEFAULT_TYPE = new Document("$literal", XMLSchema.ANYURI.stringValue()); |
| private static final Bson DEFAULT_CONTEXT = new Document("$literal", ""); |
| private static final Bson DEFAULT_DV = DocumentVisibilityAdapter.toDBObject(MongoDbRdfConstants.EMPTY_DV); |
| private static final Bson DEFAULT_METADATA = new Document("$literal", |
| StatementMetadata.EMPTY_METADATA.toString()); |
| |
| private static boolean isValidFieldName(final String name) { |
| return !(name == null || name.contains(".") || name.contains("$") |
| || name.equals("_id")); |
| } |
| |
| /** |
| * For a given statement pattern, represents a mapping from query variables |
| * to their corresponding parts of matching triples. If necessary, also |
| * substitute variable names including invalid characters with temporary |
| * replacements, while producing a map back to the original names. |
| */ |
| private static class StatementVarMapping { |
| private final Map<String, String> varToTripleValue = new HashMap<>(); |
| private final Map<String, String> varToTripleHash = new HashMap<>(); |
| private final Map<String, String> varToTripleType = new HashMap<>(); |
| private final BiMap<String, String> varToOriginalName; |
| |
| String valueField(final String varName) { |
| return varToTripleValue.get(varName); |
| } |
| String hashField(final String varName) { |
| return varToTripleHash.get(varName); |
| } |
| String typeField(final String varName) { |
| return varToTripleType.get(varName); |
| } |
| |
| Set<String> varNames() { |
| return varToTripleValue.keySet(); |
| } |
| |
| private String replace(final String original) { |
| if (varToOriginalName.containsValue(original)) { |
| return varToOriginalName.inverse().get(original); |
| } |
| else { |
| final String replacement = "field-" + UUID.randomUUID(); |
| varToOriginalName.put(replacement, original); |
| return replacement; |
| } |
| } |
| |
| private String sanitize(final String name) { |
| if (varToOriginalName.containsValue(name)) { |
| return varToOriginalName.inverse().get(name); |
| } |
| else if (name != null && !isValidFieldName(name)) { |
| return replace(name); |
| } |
| return name; |
| } |
| |
| StatementVarMapping(final StatementPattern sp, final BiMap<String, String> varToOriginalName) { |
| this.varToOriginalName = varToOriginalName; |
| if (sp.getSubjectVar() != null && !sp.getSubjectVar().hasValue()) { |
| final String name = sanitize(sp.getSubjectVar().getName()); |
| varToTripleValue.put(name, SUBJECT); |
| varToTripleHash.put(name, SUBJECT_HASH); |
| } |
| if (sp.getPredicateVar() != null && !sp.getPredicateVar().hasValue()) { |
| final String name = sanitize(sp.getPredicateVar().getName()); |
| varToTripleValue.put(name, PREDICATE); |
| varToTripleHash.put(name, PREDICATE_HASH); |
| } |
| if (sp.getObjectVar() != null && !sp.getObjectVar().hasValue()) { |
| final String name = sanitize(sp.getObjectVar().getName()); |
| varToTripleValue.put(name, OBJECT); |
| varToTripleHash.put(name, OBJECT_HASH); |
| varToTripleType.put(name, OBJECT_TYPE); |
| varToTripleType.put(name, OBJECT_LANGUAGE); |
| } |
| if (sp.getContextVar() != null && !sp.getContextVar().hasValue()) { |
| final String name = sanitize(sp.getContextVar().getName()); |
| varToTripleValue.put(name, CONTEXT); |
| } |
| } |
| |
| Bson getProjectExpression() { |
| return getProjectExpression(new LinkedList<>(), str -> "$" + str); |
| } |
| |
| Bson getProjectExpression(final Iterable<String> alsoInclude, |
| final Function<String, String> getFieldExpr) { |
| final Document values = new Document(); |
| final Document hashes = new Document(); |
| final Document types = new Document(); |
| for (final String varName : varNames()) { |
| values.append(varName, getFieldExpr.apply(valueField(varName))); |
| if (varToTripleHash.containsKey(varName)) { |
| hashes.append(varName, getFieldExpr.apply(hashField(varName))); |
| } |
| if (varToTripleType.containsKey(varName)) { |
| types.append(varName, getFieldExpr.apply(typeField(varName))); |
| } |
| } |
| for (final String varName : alsoInclude) { |
| values.append(varName, 1); |
| hashes.append(varName, 1); |
| types.append(varName, 1); |
| } |
| final List<Bson> fields = new LinkedList<>(); |
| fields.add(Projections.excludeId()); |
| fields.add(Projections.computed(VALUES, values)); |
| fields.add(Projections.computed(HASHES, hashes)); |
| if (!types.isEmpty()) { |
| fields.add(Projections.computed(TYPES, types)); |
| } |
| fields.add(Projections.computed(LEVEL, new Document("$max", |
| Arrays.asList("$" + LEVEL, getFieldExpr.apply(LEVEL), 0)))); |
| fields.add(Projections.computed(TIMESTAMP, new Document("$max", |
| Arrays.asList("$" + TIMESTAMP, getFieldExpr.apply(TIMESTAMP), 0)))); |
| return Projections.fields(fields); |
| } |
| } |
| |
| /** |
| * Given a StatementPattern, generate an object representing the arguments |
| * to a "$match" command that will find matching triples. |
| * @param sp The StatementPattern to search for |
| * @param path If given, specify the field that should be matched against |
| * the statement pattern, using an ordered list of field names for a nested |
| * field. E.g. to match records { "x": { "y": <statement pattern } }, pass |
| * "x" followed by "y". |
| * @return The argument of a "$match" query |
| */ |
| private static BasicDBObject getMatchExpression(final StatementPattern sp, final String ... path) { |
| final Var subjVar = sp.getSubjectVar(); |
| final Var predVar = sp.getPredicateVar(); |
| final Var objVar = sp.getObjectVar(); |
| final Var contextVar = sp.getContextVar(); |
| RyaIRI s = null; |
| RyaIRI p = null; |
| RyaType o = null; |
| RyaIRI c = null; |
| if (subjVar != null && subjVar.getValue() instanceof Resource) { |
| s = RdfToRyaConversions.convertResource((Resource) subjVar.getValue()); |
| } |
| if (predVar != null && predVar.getValue() instanceof IRI) { |
| p = RdfToRyaConversions.convertIRI((IRI) predVar.getValue()); |
| } |
| if (objVar != null && objVar.getValue() != null) { |
| o = RdfToRyaConversions.convertValue(objVar.getValue()); |
| } |
| if (contextVar != null && contextVar.getValue() instanceof IRI) { |
| c = RdfToRyaConversions.convertIRI((IRI) contextVar.getValue()); |
| } |
| final RyaStatement rs = new RyaStatement(s, p, o, c); |
| final DBObject obj = strategy.getQuery(rs); |
| // Add path prefix, if given |
| if (path.length > 0) { |
| final StringBuilder sb = new StringBuilder(); |
| for (final String str : path) { |
| sb.append(str).append("."); |
| } |
| final String prefix = sb.toString(); |
| final Set<String> originalKeys = new HashSet<>(obj.keySet()); |
| originalKeys.forEach(key -> { |
| final Object value = obj.removeField(key); |
| obj.put(prefix + key, value); |
| }); |
| } |
| return (BasicDBObject) obj; |
| } |
| |
| private static String valueFieldExpr(final String varName) { |
| return "$" + VALUES + "." + varName; |
| } |
| private static String hashFieldExpr(final String varName) { |
| return "$" + HASHES + "." + varName; |
| } |
| private static String typeFieldExpr(final String varName) { |
| return "$" + TYPES + "." + varName; |
| } |
| private static String joinFieldExpr(final String triplePart) { |
| return "$" + JOINED_TRIPLE + "." + triplePart; |
| } |
| |
| /** |
| * Get an object representing the value field of some value expression, or |
| * return null if the expression isn't supported. |
| */ |
| private Object valueFieldExpr(final ValueExpr expr) { |
| if (expr instanceof Var) { |
| return valueFieldExpr(((Var) expr).getName()); |
| } |
| else if (expr instanceof ValueConstant) { |
| return new Document("$literal", ((ValueConstant) expr).getValue().stringValue()); |
| } |
| else { |
| return null; |
| } |
| } |
| |
| private final List<Bson> pipeline; |
| private final MongoCollection<Document> collection; |
| private final Set<String> assuredBindingNames; |
| private final Set<String> bindingNames; |
| private final BiMap<String, String> varToOriginalName; |
| |
| private String replace(final String original) { |
| if (varToOriginalName.containsValue(original)) { |
| return varToOriginalName.inverse().get(original); |
| } |
| else { |
| final String replacement = "field-" + UUID.randomUUID(); |
| varToOriginalName.put(replacement, original); |
| return replacement; |
| } |
| } |
| |
| /** |
| * Create a pipeline query node based on a StatementPattern. |
| * @param collection The collection of triples to query. |
| * @param baseSP The leaf node in the query tree. |
| */ |
| public AggregationPipelineQueryNode(final MongoCollection<Document> collection, final StatementPattern baseSP) { |
| this.collection = Preconditions.checkNotNull(collection); |
| Preconditions.checkNotNull(baseSP); |
| this.varToOriginalName = HashBiMap.create(); |
| final StatementVarMapping mapping = new StatementVarMapping(baseSP, varToOriginalName); |
| this.assuredBindingNames = new HashSet<>(mapping.varNames()); |
| this.bindingNames = new HashSet<>(mapping.varNames()); |
| this.pipeline = new LinkedList<>(); |
| this.pipeline.add(Aggregates.match(getMatchExpression(baseSP))); |
| this.pipeline.add(Aggregates.project(mapping.getProjectExpression())); |
| } |
| |
| AggregationPipelineQueryNode(final MongoCollection<Document> collection, |
| final List<Bson> pipeline, final Set<String> assuredBindingNames, |
| final Set<String> bindingNames, final BiMap<String, String> varToOriginalName) { |
| this.collection = Preconditions.checkNotNull(collection); |
| this.pipeline = Preconditions.checkNotNull(pipeline); |
| this.assuredBindingNames = Preconditions.checkNotNull(assuredBindingNames); |
| this.bindingNames = Preconditions.checkNotNull(bindingNames); |
| this.varToOriginalName = Preconditions.checkNotNull(varToOriginalName); |
| } |
| |
| @Override |
| public boolean equals(final Object o) { |
| if (this == o) { |
| return true; |
| } |
| if (o instanceof AggregationPipelineQueryNode) { |
| final AggregationPipelineQueryNode other = (AggregationPipelineQueryNode) o; |
| if (this.collection.equals(other.collection) |
| && this.assuredBindingNames.equals(other.assuredBindingNames) |
| && this.bindingNames.equals(other.bindingNames) |
| && this.varToOriginalName.equals(other.varToOriginalName) |
| && this.pipeline.size() == other.pipeline.size()) { |
| // Check pipeline steps for equality -- underlying types don't |
| // have well-behaved equals methods, so check for equivalent |
| // string representations. |
| for (int i = 0; i < this.pipeline.size(); i++) { |
| final Bson doc1 = this.pipeline.get(i); |
| final Bson doc2 = other.pipeline.get(i); |
| if (!doc1.toString().equals(doc2.toString())) { |
| return false; |
| } |
| } |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| @Override |
| public int hashCode() { |
| return Objects.hashCode(collection, pipeline, assuredBindingNames, |
| bindingNames, varToOriginalName); |
| } |
| |
| @Override |
| public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(final BindingSet bindings) |
| throws QueryEvaluationException { |
| return new PipelineResultIteration(collection.aggregate(pipeline), varToOriginalName, bindings); |
| } |
| |
| @Override |
| public Set<String> getAssuredBindingNames() { |
| final Set<String> names = new HashSet<>(); |
| for (final String name : assuredBindingNames) { |
| names.add(varToOriginalName.getOrDefault(name, name)); |
| } |
| return names; |
| } |
| |
| @Override |
| public Set<String> getBindingNames() { |
| final Set<String> names = new HashSet<>(); |
| for (final String name : bindingNames) { |
| names.add(varToOriginalName.getOrDefault(name, name)); |
| } |
| return names; |
| } |
| |
| @Override |
| public AggregationPipelineQueryNode clone() { |
| return new AggregationPipelineQueryNode(collection, |
| new LinkedList<>(pipeline), |
| new HashSet<>(assuredBindingNames), |
| new HashSet<>(bindingNames), |
| HashBiMap.create(varToOriginalName)); |
| } |
| |
| @Override |
| public String getSignature() { |
| super.getSignature(); |
| final Set<String> assured = getAssuredBindingNames(); |
| final Set<String> any = getBindingNames(); |
| final StringBuilder sb = new StringBuilder("AggregationPipelineQueryNode (binds: "); |
| sb.append(String.join(", ", assured)); |
| if (any.size() > assured.size()) { |
| final Set<String> optionalBindingNames = any; |
| optionalBindingNames.removeAll(assured); |
| sb.append(" [") |
| .append(String.join(", ", optionalBindingNames)) |
| .append("]"); |
| } |
| sb.append(")\n"); |
| for (final Bson doc : pipeline) { |
| sb.append(doc.toString()).append("\n"); |
| } |
| return sb.toString(); |
| } |
| |
| /** |
| * Get the internal list of aggregation pipeline steps. Note that documents |
| * resulting from this pipeline will be structured using an internal |
| * intermediate representation. For documents representing triples, see |
| * {@link #getTriplePipeline}, and for query solutions, see |
| * {@link #evaluate}. |
| * @return The current internal pipeline. |
| */ |
| List<Bson> getPipeline() { |
| return pipeline; |
| } |
| |
| /** |
| * Add a join with an individual {@link StatementPattern} to the pipeline. |
| * @param sp The statement pattern to join with |
| * @return true if the join was successfully added to the pipeline. |
| */ |
| public boolean joinWith(final StatementPattern sp) { |
| Preconditions.checkNotNull(sp); |
| // 1. Determine shared variables and new variables |
| final StatementVarMapping spMap = new StatementVarMapping(sp, varToOriginalName); |
| final NavigableSet<String> sharedVars = new ConcurrentSkipListSet<>(spMap.varNames()); |
| sharedVars.retainAll(assuredBindingNames); |
| // 2. Join on one shared variable |
| final String joinKey = sharedVars.pollFirst(); |
| final String collectionName = collection.getNamespace().getCollectionName(); |
| Bson join; |
| if (joinKey == null) { |
| return false; |
| } |
| else { |
| join = Aggregates.lookup(collectionName, |
| HASHES + "." + joinKey, |
| spMap.hashField(joinKey), |
| JOINED_TRIPLE); |
| } |
| pipeline.add(join); |
| // 3. Unwind the joined triples so each document represents a binding |
| // set (solution) from the base branch and a triple that may match. |
| pipeline.add(Aggregates.unwind("$" + JOINED_TRIPLE)); |
| // 4. (Optional) If there are any shared variables that weren't used as |
| // the join key, project all existing fields plus a new field that |
| // tests the equality of those shared variables. |
| final BasicDBObject matchOpts = getMatchExpression(sp, JOINED_TRIPLE); |
| if (!sharedVars.isEmpty()) { |
| final List<Bson> eqTests = new LinkedList<>(); |
| for (final String varName : sharedVars) { |
| final String oldField = valueFieldExpr(varName); |
| final String newField = joinFieldExpr(spMap.valueField(varName)); |
| final Bson eqTest = new Document("$eq", Arrays.asList(oldField, newField)); |
| eqTests.add(eqTest); |
| } |
| final Bson eqProjectOpts = Projections.fields( |
| Projections.computed(FIELDS_MATCH, Filters.and(eqTests)), |
| Projections.include(JOINED_TRIPLE, VALUES, HASHES, TYPES, LEVEL, TIMESTAMP)); |
| pipeline.add(Aggregates.project(eqProjectOpts)); |
| matchOpts.put(FIELDS_MATCH, true); |
| } |
| // 5. Filter for solutions whose triples match the joined statement |
| // pattern, and, if applicable, whose additional shared variables |
| // match the current solution. |
| pipeline.add(Aggregates.match(matchOpts)); |
| // 6. Project the results to include variables from the new SP (with |
| // appropriate renaming) and variables referenced only in the base |
| // pipeline (with previous names). |
| final Bson finalProjectOpts = new StatementVarMapping(sp, varToOriginalName) |
| .getProjectExpression(assuredBindingNames, |
| str -> joinFieldExpr(str)); |
| assuredBindingNames.addAll(spMap.varNames()); |
| bindingNames.addAll(spMap.varNames()); |
| pipeline.add(Aggregates.project(finalProjectOpts)); |
| return true; |
| } |
| |
| /** |
| * Add a SPARQL projection or multi-projection operation to the pipeline. |
| * The number of documents produced by the pipeline after this operation |
| * will be the number of documents entering this stage (the number of |
| * intermediate results) multiplied by the number of |
| * {@link ProjectionElemList}s supplied here. Empty projections are |
| * unsupported; if one or more projections given binds zero variables, then |
| * the pipeline will be unchanged and the method will return false. |
| * @param projections One or more projections, i.e. mappings from the result |
| * at this stage of the query into a set of variables. |
| * @return true if the projection(s) were added to the pipeline. |
| */ |
| public boolean project(final Iterable<ProjectionElemList> projections) { |
| if (projections == null || !projections.iterator().hasNext()) { |
| return false; |
| } |
| final List<Bson> projectOpts = new LinkedList<>(); |
| final Set<String> bindingNamesUnion = new HashSet<>(); |
| Set<String> bindingNamesIntersection = null; |
| for (final ProjectionElemList projection : projections) { |
| if (projection.getElements().isEmpty()) { |
| // Empty projections are unsupported -- fail when seen |
| return false; |
| } |
| final Document valueDoc = new Document(); |
| final Document hashDoc = new Document(); |
| final Document typeDoc = new Document(); |
| final Set<String> projectionBindingNames = new HashSet<>(); |
| for (final ProjectionElem elem : projection.getElements()) { |
| String to = elem.getTargetName(); |
| // If the 'to' name is invalid, replace it internally |
| if (!isValidFieldName(to)) { |
| to = replace(to); |
| } |
| String from = elem.getSourceName(); |
| // If the 'from' name is invalid, use the internal substitute |
| if (varToOriginalName.containsValue(from)) { |
| from = varToOriginalName.inverse().get(from); |
| } |
| projectionBindingNames.add(to); |
| if (to.equals(from)) { |
| valueDoc.append(to, 1); |
| hashDoc.append(to, 1); |
| typeDoc.append(to, 1); |
| } |
| else { |
| valueDoc.append(to, valueFieldExpr(from)); |
| hashDoc.append(to, hashFieldExpr(from)); |
| typeDoc.append(to, typeFieldExpr(from)); |
| } |
| } |
| bindingNamesUnion.addAll(projectionBindingNames); |
| if (bindingNamesIntersection == null) { |
| bindingNamesIntersection = new HashSet<>(projectionBindingNames); |
| } |
| else { |
| bindingNamesIntersection.retainAll(projectionBindingNames); |
| } |
| projectOpts.add(new Document() |
| .append(VALUES, valueDoc) |
| .append(HASHES, hashDoc) |
| .append(TYPES, typeDoc) |
| .append(LEVEL, "$" + LEVEL) |
| .append(TIMESTAMP, "$" + TIMESTAMP)); |
| } |
| if (projectOpts.size() == 1) { |
| pipeline.add(Aggregates.project(projectOpts.get(0))); |
| } |
| else { |
| final String listKey = "PROJECTIONS"; |
| final Bson projectIndividual = Projections.fields( |
| Projections.computed(VALUES, "$" + listKey + "." + VALUES), |
| Projections.computed(HASHES, "$" + listKey + "." + HASHES), |
| Projections.computed(TYPES, "$" + listKey + "." + TYPES), |
| Projections.include(LEVEL), |
| Projections.include(TIMESTAMP)); |
| pipeline.add(Aggregates.project(Projections.computed(listKey, projectOpts))); |
| pipeline.add(Aggregates.unwind("$" + listKey)); |
| pipeline.add(Aggregates.project(projectIndividual)); |
| } |
| assuredBindingNames.clear(); |
| bindingNames.clear(); |
| assuredBindingNames.addAll(bindingNamesIntersection); |
| bindingNames.addAll(bindingNamesUnion); |
| return true; |
| } |
| |
| /** |
| * Add a SPARQL extension to the pipeline, if possible. An extension adds |
| * some number of variables to the result. Adds a "$project" step to the |
| * pipeline, but differs from the SPARQL project operation in that |
| * 1) pre-existing variables are always kept, and 2) values of new variables |
| * are defined by expressions, which may be more complex than simply |
| * variable names. Not all expressions are supported. If unsupported |
| * expression types are used in the extension, the pipeline will remain |
| * unchanged and this method will return false. |
| * @param extensionElements A list of new variables and their expressions |
| * @return True if the extension was successfully converted into a pipeline |
| * step, false otherwise. |
| */ |
| public boolean extend(final Iterable<ExtensionElem> extensionElements) { |
| final List<Bson> valueFields = new LinkedList<>(); |
| final List<Bson> hashFields = new LinkedList<>(); |
| final List<Bson> typeFields = new LinkedList<>(); |
| for (final String varName : bindingNames) { |
| valueFields.add(Projections.include(varName)); |
| hashFields.add(Projections.include(varName)); |
| typeFields.add(Projections.include(varName)); |
| } |
| final Set<String> newVarNames = new HashSet<>(); |
| for (final ExtensionElem elem : extensionElements) { |
| String name = elem.getName(); |
| if (!isValidFieldName(name)) { |
| // If the field name is invalid, replace it internally |
| name = replace(name); |
| } |
| // We can only handle certain kinds of value expressions; return |
| // failure for any others. |
| final ValueExpr expr = elem.getExpr(); |
| final Object valueField; |
| final Object hashField; |
| final Object typeField; |
| if (expr instanceof Var) { |
| final String varName = ((Var) expr).getName(); |
| valueField = "$" + varName; |
| hashField = "$" + varName; |
| typeField = "$" + varName; |
| } |
| else if (expr instanceof ValueConstant) { |
| final Value val = ((ValueConstant) expr).getValue(); |
| valueField = new Document("$literal", val.stringValue()); |
| hashField = new Document("$literal", SimpleMongoDBStorageStrategy.hash(val.stringValue())); |
| if (val instanceof Literal) { |
| typeField = new Document("$literal", ((Literal) val).getDatatype().stringValue()); |
| } |
| else { |
| typeField = null; |
| } |
| } |
| else { |
| // if not understood, return failure |
| return false; |
| } |
| valueFields.add(Projections.computed(name, valueField)); |
| hashFields.add(Projections.computed(name, hashField)); |
| if (typeField != null) { |
| typeFields.add(Projections.computed(name, typeField)); |
| } |
| newVarNames.add(name); |
| } |
| assuredBindingNames.addAll(newVarNames); |
| bindingNames.addAll(newVarNames); |
| final Bson projectOpts = Projections.fields( |
| Projections.computed(VALUES, Projections.fields(valueFields)), |
| Projections.computed(HASHES, Projections.fields(hashFields)), |
| Projections.computed(TYPES, Projections.fields(typeFields)), |
| Projections.include(LEVEL), |
| Projections.include(TIMESTAMP)); |
| pipeline.add(Aggregates.project(projectOpts)); |
| return true; |
| } |
| |
| /** |
| * Add a SPARQL filter to the pipeline, if possible. A filter eliminates |
| * results that don't satisfy a given condition. Not all conditional |
| * expressions are supported. If unsupported expressions are used in the |
| * filter, the pipeline will remain unchanged and this method will return |
| * false. Currently only supports binary {@link Compare} conditions among |
| * variables and/or literals. |
| * @param condition The filter condition |
| * @return True if the filter was successfully converted into a pipeline |
| * step, false otherwise. |
| */ |
| public boolean filter(final ValueExpr condition) { |
| if (condition instanceof Compare) { |
| final Compare compare = (Compare) condition; |
| final Compare.CompareOp operator = compare.getOperator(); |
| final Object leftArg = valueFieldExpr(compare.getLeftArg()); |
| final Object rightArg = valueFieldExpr(compare.getRightArg()); |
| if (leftArg == null || rightArg == null) { |
| // unsupported value expression, can't convert filter |
| return false; |
| } |
| final String opFunc; |
| switch (operator) { |
| case EQ: |
| opFunc = "$eq"; |
| break; |
| case NE: |
| opFunc = "$ne"; |
| break; |
| case LT: |
| opFunc = "$lt"; |
| break; |
| case LE: |
| opFunc = "$le"; |
| break; |
| case GT: |
| opFunc = "$gt"; |
| break; |
| case GE: |
| opFunc = "$ge"; |
| break; |
| default: |
| // unrecognized comparison operator, can't convert filter |
| return false; |
| } |
| final Document compareDoc = new Document(opFunc, Arrays.asList(leftArg, rightArg)); |
| pipeline.add(Aggregates.project(Projections.fields( |
| Projections.computed("FILTER", compareDoc), |
| Projections.include(VALUES, HASHES, TYPES, LEVEL, TIMESTAMP)))); |
| pipeline.add(Aggregates.match(new Document("FILTER", true))); |
| pipeline.add(Aggregates.project(Projections.fields( |
| Projections.include(VALUES, HASHES, TYPES, LEVEL, TIMESTAMP)))); |
| return true; |
| } |
| return false; |
| } |
| |
| /** |
| * Add a $group step to filter out redundant solutions. |
| * @return True if the distinct operation was successfully appended. |
| */ |
| public boolean distinct() { |
| final List<String> key = new LinkedList<>(); |
| for (final String varName : bindingNames) { |
| key.add(hashFieldExpr(varName)); |
| } |
| final List<BsonField> reduceOps = new LinkedList<>(); |
| for (final String field : FIELDS) { |
| reduceOps.add(new BsonField(field, new Document("$first", "$" + field))); |
| } |
| pipeline.add(Aggregates.group(new Document("$concat", key), reduceOps)); |
| return true; |
| } |
| |
| /** |
| * Add a step to the end of the current pipeline which prunes the results |
| * according to the recorded derivation level of their sources. At least one |
| * triple that was used to construct the result must have a derivation level |
| * at least as high as the parameter, indicating that it was derived via |
| * that many steps from the original data. (A value of zero is equivalent to |
| * input data that was not derived at all.) Use in conjunction with |
| * getTriplePipeline (which sets source level for generated triples) to |
| * avoid repeatedly deriving the same results. |
| * @param requiredLevel Required derivation depth. Reject a solution to the |
| * query if all of the triples involved in producing that solution have a |
| * lower derivation depth than this. If zero, does nothing. |
| */ |
| public void requireSourceDerivationDepth(final int requiredLevel) { |
| if (requiredLevel > 0) { |
| pipeline.add(Aggregates.match(new Document(LEVEL, |
| new Document("$gte", requiredLevel)))); |
| } |
| } |
| |
| /** |
| * Add a step to the end of the current pipeline which prunes the results |
| * according to the timestamps of their sources. At least one triple that |
| * was used to construct the result must have a timestamp at least as |
| * recent as the parameter. Use in iterative applications to avoid deriving |
| * solutions that would have been generated in an earlier iteration. |
| * @param t Minimum required timestamp. Reject a solution to the query if |
| * all of the triples involved in producing that solution have an earlier |
| * timestamp than this. |
| */ |
| public void requireSourceTimestamp(final long t) { |
| pipeline.add(Aggregates.match(new Document(TIMESTAMP, |
| new Document("$gte", t)))); |
| } |
| |
| /** |
| * Given that the current state of the pipeline produces data that can be |
| * interpreted as triples, add a project step to map each result from the |
| * intermediate result structure to a structure that can be stored in the |
| * triple store. Does not modify the internal pipeline, which will still |
| * produce intermediate results suitable for query evaluation. |
| * @param timestamp Attach this timestamp to the resulting triples. |
| * @param requireNew If true, add an additional step to check constructed |
| * triples against existing triples and only include new ones in the |
| * result. Adds a potentially expensive $lookup step. |
| * @throws IllegalStateException if the results produced by the current |
| * pipeline do not have variable names allowing them to be interpreted as |
| * triples (i.e. "subject", "predicate", and "object"). |
| */ |
| public List<Bson> getTriplePipeline(final long timestamp, final boolean requireNew) { |
| if (!assuredBindingNames.contains(SUBJECT) |
| || !assuredBindingNames.contains(PREDICATE) |
| || !assuredBindingNames.contains(OBJECT)) { |
| throw new IllegalStateException("Current pipeline does not produce " |
| + "records that can be converted into triples.\n" |
| + "Required variable names: <" + SUBJECT + ", " + PREDICATE |
| + ", " + OBJECT + ">\nCurrent variable names: " |
| + assuredBindingNames); |
| } |
| final List<Bson> triplePipeline = new LinkedList<>(pipeline); |
| final List<Bson> fields = new LinkedList<>(); |
| fields.add(Projections.computed(SUBJECT, valueFieldExpr(SUBJECT))); |
| fields.add(Projections.computed(SUBJECT_HASH, hashFieldExpr(SUBJECT))); |
| fields.add(Projections.computed(PREDICATE, valueFieldExpr(PREDICATE))); |
| fields.add(Projections.computed(PREDICATE_HASH, hashFieldExpr(PREDICATE))); |
| fields.add(Projections.computed(OBJECT, valueFieldExpr(OBJECT))); |
| fields.add(Projections.computed(OBJECT_HASH, hashFieldExpr(OBJECT))); |
| fields.add(Projections.computed(OBJECT_TYPE, |
| ConditionalOperators.ifNull(typeFieldExpr(OBJECT), DEFAULT_TYPE))); |
| fields.add(Projections.computed(OBJECT_LANGUAGE, hashFieldExpr(OBJECT))); |
| fields.add(Projections.computed(CONTEXT, DEFAULT_CONTEXT)); |
| fields.add(Projections.computed(STATEMENT_METADATA, DEFAULT_METADATA)); |
| fields.add(DEFAULT_DV); |
| fields.add(Projections.computed(TIMESTAMP, new Document("$literal", timestamp))); |
| fields.add(Projections.computed(LEVEL, new Document("$add", Arrays.asList("$" + LEVEL, 1)))); |
| triplePipeline.add(Aggregates.project(Projections.fields(fields))); |
| if (requireNew) { |
| // Prune any triples that already exist in the data store |
| final String collectionName = collection.getNamespace().getCollectionName(); |
| final Bson includeAll = Projections.include(SUBJECT, SUBJECT_HASH, |
| PREDICATE, PREDICATE_HASH, OBJECT, OBJECT_HASH, |
| OBJECT_TYPE, OBJECT_LANGUAGE, CONTEXT, STATEMENT_METADATA, |
| DOCUMENT_VISIBILITY, TIMESTAMP, LEVEL); |
| final List<Bson> eqTests = new LinkedList<>(); |
| eqTests.add(new Document("$eq", Arrays.asList("$$this." + PREDICATE_HASH, "$" + PREDICATE_HASH))); |
| eqTests.add(new Document("$eq", Arrays.asList("$$this." + OBJECT_HASH, "$" + OBJECT_HASH))); |
| final Bson redundantFilter = new Document("$filter", new Document("input", "$" + JOINED_TRIPLE) |
| .append("as", "this").append("cond", new Document("$and", eqTests))); |
| triplePipeline.add(Aggregates.lookup(collectionName, SUBJECT_HASH, |
| SUBJECT_HASH, JOINED_TRIPLE)); |
| final String numRedundant = "REDUNDANT"; |
| triplePipeline.add(Aggregates.project(Projections.fields(includeAll, |
| Projections.computed(numRedundant, new Document("$size", redundantFilter))))); |
| triplePipeline.add(Aggregates.match(Filters.eq(numRedundant, 0))); |
| triplePipeline.add(Aggregates.project(Projections.fields(includeAll))); |
| } |
| return triplePipeline; |
| } |
| } |