Updates for Mongo aggregation
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/AggregationPipelineQueryNode.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/AggregationPipelineQueryNode.java
index 6996fcd..fda8538 100644
--- a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/AggregationPipelineQueryNode.java
+++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/AggregationPipelineQueryNode.java
@@ -95,6 +95,8 @@
  * false.
  */
 public class AggregationPipelineQueryNode extends ExternalSet {
+    private static final long serialVersionUID = 1L;
+
     /**
      * An aggregation result corresponding to a solution should map this key
      * to an object which itself maps variable names to variable values.
@@ -230,8 +232,12 @@
             }
             final List<Bson> fields = new LinkedList<>();
             fields.add(Projections.excludeId());
-            fields.add(Projections.computed(VALUES, values));
-            fields.add(Projections.computed(HASHES, hashes));
+            if (!values.isEmpty()) {
+                fields.add(Projections.computed(VALUES, values));
+            }
+            if (!hashes.isEmpty()) {
+                fields.add(Projections.computed(HASHES, hashes));
+            }
             if (!types.isEmpty()) {
                 fields.add(Projections.computed(TYPES, types));
             }
@@ -778,8 +784,7 @@
      */
     public void requireSourceDerivationDepth(final int requiredLevel) {
         if (requiredLevel > 0) {
-            pipeline.add(Aggregates.match(new Document(LEVEL,
-                    new Document("$gte", requiredLevel))));
+            pipeline.add(Aggregates.match(Filters.gte(LEVEL, requiredLevel)));
         }
     }
 
@@ -794,8 +799,7 @@
      *  timestamp than this.
      */
     public void requireSourceTimestamp(final long t) {
-        pipeline.add(Aggregates.match(new Document(TIMESTAMP,
-                new Document("$gte", t))));
+        pipeline.add(Aggregates.match(Filters.gte(TIMESTAMP, t)));
     }
 
     /**
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/iter/RyaStatementBindingSetCursorIterator.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/iter/RyaStatementBindingSetCursorIterator.java
index 462da1c..179b3d4 100644
--- a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/iter/RyaStatementBindingSetCursorIterator.java
+++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/iter/RyaStatementBindingSetCursorIterator.java
@@ -34,16 +34,19 @@
 import org.apache.rya.mongodb.dao.MongoDBStorageStrategy;
 import org.apache.rya.mongodb.document.operators.aggregation.AggregationUtil;
 import org.bson.Document;
+import org.bson.conversions.Bson;
 import org.eclipse.rdf4j.common.iteration.CloseableIteration;
 import org.eclipse.rdf4j.query.BindingSet;
 
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Multimap;
+import com.mongodb.BasicDBObject;
 import com.mongodb.DBObject;
 import com.mongodb.client.AggregateIterable;
 import com.mongodb.client.MongoCollection;
-import com.mongodb.util.JSON;
+import com.mongodb.client.model.Aggregates;
+import com.mongodb.client.model.Filters;
 
 public class RyaStatementBindingSetCursorIterator implements CloseableIteration<Entry<RyaStatement, BindingSet>, RyaDAOException> {
     private static final Logger log = Logger.getLogger(RyaStatementBindingSetCursorIterator.class);
@@ -102,7 +105,7 @@
         if (currentBatchQueryResultCursorIsValid()) {
             // convert to Rya Statement
             final Document queryResult = batchQueryResultsIterator.next();
-            final DBObject dbo = (DBObject) JSON.parse(queryResult.toJson());
+            final DBObject dbo = BasicDBObject.parse(queryResult.toJson());
             currentResultStatement = strategy.deserializeDBObject(dbo);
 
             // Find all of the queries in the executed RangeMap that this result matches
@@ -136,21 +139,23 @@
     private void submitBatchQuery() {
         int count = 0;
         executedRangeMap.clear();
-        final List<Document> pipeline = new ArrayList<>();
-        final List<DBObject> match = new ArrayList<>();
+        final List<Bson> pipeline = new ArrayList<>();
+        final List<Bson> matches = new ArrayList<>();
 
         while (queryIterator.hasNext() && count < QUERY_BATCH_SIZE){
             count++;
             final RyaStatement query = queryIterator.next();
             executedRangeMap.putAll(query, rangeMap.get(query));
             final DBObject currentQuery = strategy.getQuery(query);
-            match.add(currentQuery);
+            final Document doc = Document.parse(currentQuery.toString());
+            matches.add(doc);
         }
 
-        if (match.size() > 1) {
-            pipeline.add(new Document("$match", new Document("$or", match)));
-        } else if (match.size() == 1) {
-            pipeline.add(new Document("$match", match.get(0)));
+        final int numMatches = matches.size();
+        if (numMatches > 1) {
+            pipeline.add(Aggregates.match(Filters.or(matches)));
+        } else if (numMatches == 1) {
+            pipeline.add(Aggregates.match(matches.get(0)));
         } else {
             batchQueryResultsIterator = Iterators.emptyIterator();
             return;
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/statement/metadata/matching/StatementMetadataNode.java b/extras/indexing/src/main/java/org/apache/rya/indexing/statement/metadata/matching/StatementMetadataNode.java
index a5f8b3a..439d06a 100644
--- a/extras/indexing/src/main/java/org/apache/rya/indexing/statement/metadata/matching/StatementMetadataNode.java
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/statement/metadata/matching/StatementMetadataNode.java
@@ -8,9 +8,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -37,9 +37,9 @@
 
 import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
 import org.apache.rya.api.RdfCloudTripleStoreUtils;
+import org.apache.rya.api.domain.RyaIRI;
 import org.apache.rya.api.domain.RyaStatement;
 import org.apache.rya.api.domain.RyaType;
-import org.apache.rya.api.domain.RyaIRI;
 import org.apache.rya.api.domain.StatementMetadata;
 import org.apache.rya.api.persist.RyaDAOException;
 import org.apache.rya.api.persist.query.RyaQueryEngine;
@@ -81,7 +81,7 @@
  * expensive from a storage perspective. It is also expensive from a query
  * perspective in that three joins are required to evaluate a query that is
  * reduced to a single scan in non-reified form.
- * 
+ *
  * This class provides Rya with the ability to issue reified queries even though
  * statements are not reified. Each {@link RyaStatement} contains a
  * {@link StatementMetadata} field that allows users to store additional
@@ -112,18 +112,18 @@
 
     private StatementPattern statement;
     private Map<RyaIRI, Var> properties;
-    private Collection<StatementPattern> patterns;
-    private List<RyaIRI> uriList = Arrays.asList(TYPE_ID_URI, SUBJ_ID_URI, PRED_ID_URI, OBJ_ID_URI);
-    private C conf;
+    private final Collection<StatementPattern> patterns;
+    private final List<RyaIRI> uriList = Arrays.asList(TYPE_ID_URI, SUBJ_ID_URI, PRED_ID_URI, OBJ_ID_URI);
+    private final C conf;
     private Set<String> bindingNames;
     private RyaQueryEngine<C> queryEngine;
 
-    public StatementMetadataNode(final Collection<StatementPattern> patterns, C conf) {
+    public StatementMetadataNode(final Collection<StatementPattern> patterns, final C conf) {
         this.conf = conf;
         this.patterns = patterns;
         verifySameSubjects(patterns);
         verifyAllPredicatesAreConstants(patterns);
-        boolean correctForm = verifyHasCorrectTypePattern(patterns);
+        final boolean correctForm = verifyHasCorrectTypePattern(patterns);
         if (!correctForm) {
             throw new IllegalArgumentException("Invalid reified StatementPatterns.");
         }
@@ -132,7 +132,7 @@
 
     /**
      * Get {@link StatementPattern}s representing the underlying reified query.
-     * 
+     *
      * @return Collection of StatementPatterns
      */
     public Collection<StatementPattern> getReifiedStatementPatterns() {
@@ -148,7 +148,7 @@
      * @throws IllegalStateException
      *             If all of the Subjects are not the same.
      */
-    private static void verifySameSubjects(Collection<StatementPattern> patterns) throws IllegalStateException {
+    private static void verifySameSubjects(final Collection<StatementPattern> patterns) throws IllegalStateException {
         requireNonNull(patterns);
 
         final Iterator<StatementPattern> it = patterns.iterator();
@@ -204,7 +204,7 @@
         boolean valid = true;
         boolean contextSet = false;
         Var context = null;
-        
+
         for (final StatementPattern pattern : patterns) {
             final RyaIRI predicate = new RyaIRI(pattern.getPredicateVar().getValue().toString());
 
@@ -216,15 +216,20 @@
                     return false;
                 }
             }
-            
+
             if (predicate.equals(TYPE_ID_URI)) {
-                final RyaIRI statementID = new RyaIRI(pattern.getObjectVar().getValue().stringValue());
-                if (statementID.equals(STATEMENT_ID_URI)) {
-                    statementFound = true;
+                final Value objectValue = pattern.getObjectVar().getValue();
+                if (objectValue != null) {
+                    final RyaIRI statementID = new RyaIRI(objectValue.stringValue());
+                    if (statementID.equals(STATEMENT_ID_URI)) {
+                        statementFound = true;
+                    } else {
+                        // contains more than one Statement containing TYPE_ID_URI
+                        // as Predicate
+                        // and STATEMENT_ID_URI as Object
+                        valid = false;
+                    }
                 } else {
-                    // contains more than one Statement containing TYPE_ID_URI
-                    // as Predicate
-                    // and STATEMENT_ID_URI as Object
                     valid = false;
                 }
             }
@@ -274,20 +279,20 @@
      * the user specified metadata properties and is used for comparison with
      * the metadata properties extracted from RyaStatements passed back by the
      * {@link RyaQueryEngine}.
-     * 
+     *
      * @param patterns
      *            - collection of patterns representing a reified query
      */
-    private void setStatementPatternAndProperties(Collection<StatementPattern> patterns) {
+    private void setStatementPatternAndProperties(final Collection<StatementPattern> patterns) {
 
-        StatementPattern sp = new StatementPattern();
-        Map<RyaIRI, Var> properties = new HashMap<>();
+        final StatementPattern sp = new StatementPattern();
+        final Map<RyaIRI, Var> properties = new HashMap<>();
 
         for (final StatementPattern pattern : patterns) {
             final RyaIRI predicate = new RyaIRI(pattern.getPredicateVar().getValue().toString());
 
             if (!uriList.contains(predicate)) {
-                Var objVar = pattern.getObjectVar();
+                final Var objVar = pattern.getObjectVar();
                 properties.put(predicate, objVar);
                 continue;
             }
@@ -315,17 +320,17 @@
      * {@link RyaQueryEngine}.
      */
     @Override
-    public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(Collection<BindingSet> bindingset)
+    public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(final Collection<BindingSet> bindingset)
             throws QueryEvaluationException {
         if (bindingset.size() == 0) {
             return new EmptyIteration<>();
         }
 
         queryEngine = RyaQueryEngineFactory.getQueryEngine(conf);
-        Set<Map.Entry<RyaStatement, BindingSet>> statements = new HashSet<>();
-        Iterator<BindingSet> iter = bindingset.iterator();
+        final Set<Map.Entry<RyaStatement, BindingSet>> statements = new HashSet<>();
+        final Iterator<BindingSet> iter = bindingset.iterator();
         while (iter.hasNext()) {
-            BindingSet bs = iter.next();
+            final BindingSet bs = iter.next();
             statements.add(new RdfCloudTripleStoreUtils.CustomEntry<RyaStatement, BindingSet>(
                     getRyaStatementFromBindings(bs), bs));
         }
@@ -333,7 +338,7 @@
         final CloseableIteration<? extends Entry<RyaStatement, BindingSet>, RyaDAOException> iteration;
         try {
             iteration = queryEngine.queryWithBindingSet(statements, conf);
-        } catch (RyaDAOException e) {
+        } catch (final RyaDAOException e) {
             throw new RuntimeException(e);
         }
 
@@ -344,17 +349,17 @@
      * Uses StatementPattern constraints to form a RyaStatement, and fills in
      * any null values with {@link BindingSet} values corresponding to the
      * variable for that position.
-     * 
+     *
      * @param bs
      * @return RyaStatement whose values are determined by StatementPattern and
      *         BindingSet constraints
      */
-    private RyaStatement getRyaStatementFromBindings(BindingSet bs) {
+    private RyaStatement getRyaStatementFromBindings(final BindingSet bs) {
 
-        Value subjValue = getVarValue(statement.getSubjectVar(), bs);
-        Value predValue = getVarValue(statement.getPredicateVar(), bs);
-        Value objValue = getVarValue(statement.getObjectVar(), bs);
-        Value contextValue = getVarValue(statement.getContextVar(), bs);
+        final Value subjValue = getVarValue(statement.getSubjectVar(), bs);
+        final Value predValue = getVarValue(statement.getPredicateVar(), bs);
+        final Value objValue = getVarValue(statement.getObjectVar(), bs);
+        final Value contextValue = getVarValue(statement.getContextVar(), bs);
         RyaIRI subj = null;
         RyaIRI pred = null;
         RyaType obj = null;
@@ -373,7 +378,7 @@
         if (objValue != null) {
             obj = RdfToRyaConversions.convertValue(objValue);
         }
-        
+
         if(contextValue != null) {
             context = RdfToRyaConversions.convertIRI((IRI) contextValue);
         }
@@ -386,12 +391,12 @@
      * otherwise returns the BindingSet Value corresponding to
      * {@link Var#getName()}. If no such Binding exits, this method returns
      * null.
-     * 
+     *
      * @param var
      * @param bindings
      * @return Value
      */
-    private Value getVarValue(Var var, BindingSet bindings) {
+    private Value getVarValue(final Var var, final BindingSet bindings) {
         if (var == null) {
             return null;
         } else if (var.hasValue()) {
@@ -402,20 +407,20 @@
     }
 
     @Override
-    public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(BindingSet bindings)
+    public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(final BindingSet bindings)
             throws QueryEvaluationException {
         return evaluate(Collections.singleton(bindings));
     }
 
     @Override
-    public boolean equals(Object other) {
+    public boolean equals(final Object other) {
 
         if (this == other) {
             return true;
         }
 
         if (other instanceof StatementMetadataNode) {
-            StatementMetadataNode<?> meta = (StatementMetadataNode<?>) other;
+            final StatementMetadataNode<?> meta = (StatementMetadataNode<?>) other;
             if (meta.patterns.size() != this.patterns.size()) {
                 return false;
             }
@@ -424,8 +429,8 @@
                 return false;
             }
 
-            Set<StatementPattern> thisSet = new HashSet<>(patterns);
-            Set<StatementPattern> thatSet = new HashSet<>(meta.patterns);
+            final Set<StatementPattern> thisSet = new HashSet<>(patterns);
+            final Set<StatementPattern> thatSet = new HashSet<>(meta.patterns);
             return thisSet.equals(thatSet);
         } else {
             return false;
@@ -435,7 +440,7 @@
     @Override
     public int hashCode() {
         int hashcode = 0;
-        for (StatementPattern sp : patterns) {
+        for (final StatementPattern sp : patterns) {
             hashcode += sp.hashCode();
         }
         return hashcode;
@@ -465,9 +470,9 @@
     }
 
     private Set<String> getVariableNames() {
-        Set<String> vars = new HashSet<>();
-        for (StatementPattern pattern : patterns) {
-            for (Var var : pattern.getVarList()) {
+        final Set<String> vars = new HashSet<>();
+        for (final StatementPattern pattern : patterns) {
+            for (final Var var : pattern.getVarList()) {
                 if (var.getValue() == null) {
                     vars.add(var.getName());
                 }
@@ -495,16 +500,16 @@
      */
     class PropertyFilterAndBindingSetJoinIteration implements CloseableIteration<BindingSet, QueryEvaluationException> {
 
-        private CloseableIteration<? extends Entry<RyaStatement, BindingSet>, RyaDAOException> statements;
-        private Map<RyaIRI, Var> properties;
-        private StatementPattern sp;
+        private final CloseableIteration<? extends Entry<RyaStatement, BindingSet>, RyaDAOException> statements;
+        private final Map<RyaIRI, Var> properties;
+        private final StatementPattern sp;
         private BindingSet next;
         private boolean hasNextCalled = false;
         private boolean hasNext = false;
 
         public PropertyFilterAndBindingSetJoinIteration(
-                CloseableIteration<? extends Entry<RyaStatement, BindingSet>, RyaDAOException> statements,
-                Map<RyaIRI, Var> properties, StatementPattern sp) {
+                final CloseableIteration<? extends Entry<RyaStatement, BindingSet>, RyaDAOException> statements,
+                final Map<RyaIRI, Var> properties, final StatementPattern sp) {
             this.statements = statements;
             this.properties = properties;
             this.sp = sp;
@@ -562,7 +567,7 @@
         public void close() throws QueryEvaluationException {
             try {
                 statements.close();
-            } catch (RyaDAOException e) {
+            } catch (final RyaDAOException e) {
                 throw new QueryEvaluationException(e);
             }
         }
@@ -570,14 +575,14 @@
         /**
          * Fast-forwards Iteration to next valid Entry and builds the
          * BindingSet.
-         * 
+         *
          * @return BindingSet
          * @throws RyaDAOException
          */
         private Optional<BindingSet> getNext() throws RyaDAOException {
             Optional<BindingSet> optionalBs = Optional.empty();
             while (statements.hasNext() && !optionalBs.isPresent()) {
-                Map.Entry<RyaStatement, BindingSet> next = statements.next();
+                final Map.Entry<RyaStatement, BindingSet> next = statements.next();
                 optionalBs = buildBindingSet(next.getKey(), next.getValue());
             }
             return optionalBs;
@@ -590,7 +595,7 @@
          * {@link StatementMetadata} properties for the specified RyaStatement
          * and if the BindingSet built form the StatementMetadata properties can
          * be joined with specified BindingSet.
-         * 
+         *
          * @param statement
          *            - RyaStatement
          * @param bindingSet
@@ -598,15 +603,15 @@
          * @return - Optional containing BindingSet is a valid BindingSet could
          *         be built
          */
-        private Optional<BindingSet> buildBindingSet(RyaStatement statement, BindingSet bindingSet) {
+        private Optional<BindingSet> buildBindingSet(final RyaStatement statement, final BindingSet bindingSet) {
 
-            QueryBindingSet bs = new QueryBindingSet();
-            Optional<BindingSet> optPropBs = buildPropertyBindingSet(statement);
+            final QueryBindingSet bs = new QueryBindingSet();
+            final Optional<BindingSet> optPropBs = buildPropertyBindingSet(statement);
             if (!optPropBs.isPresent()) {
                 return Optional.empty();
             }
-            BindingSet propBs = optPropBs.get();
-            BindingSet spBs = buildBindingSetFromStatementPattern(statement);
+            final BindingSet propBs = optPropBs.get();
+            final BindingSet spBs = buildBindingSetFromStatementPattern(statement);
             if (!canJoinBindingSets(spBs, propBs)) {
                 return Optional.empty();
             }
@@ -625,24 +630,24 @@
          * StatementMetadata properties for specified RyaStatement. If
          * consistent, this method builds the associated BindingSet otherwise an
          * empty Optional is returned.
-         * 
+         *
          * @param statement
          * @return
          */
-        private Optional<BindingSet> buildPropertyBindingSet(RyaStatement statement) {
-            StatementMetadata metadata = statement.getMetadata();
-            Map<RyaIRI, RyaType> statementProps = metadata.getMetadata();
+        private Optional<BindingSet> buildPropertyBindingSet(final RyaStatement statement) {
+            final StatementMetadata metadata = statement.getMetadata();
+            final Map<RyaIRI, RyaType> statementProps = metadata.getMetadata();
             if (statementProps.size() < properties.size()) {
                 return Optional.empty();
             }
-            QueryBindingSet bs = new QueryBindingSet();
-            for (Map.Entry<RyaIRI, Var> entry : properties.entrySet()) {
-                RyaIRI key = entry.getKey();
-                Var var = entry.getValue();
+            final QueryBindingSet bs = new QueryBindingSet();
+            for (final Map.Entry<RyaIRI, Var> entry : properties.entrySet()) {
+                final RyaIRI key = entry.getKey();
+                final Var var = entry.getValue();
                 if (!statementProps.containsKey(key)) {
                     return Optional.empty();
                 } else {
-                    Value val = RyaToRdfConversions.convertValue(statementProps.get(key));
+                    final Value val = RyaToRdfConversions.convertValue(statementProps.get(key));
                     if (var.getValue() == null) {
                         bs.addBinding(var.getName(), val);
                     } else if (!var.getValue().equals(val)) {
@@ -661,16 +666,16 @@
          * If it doesn't have a Value, a Binding is created from the
          * RyaStatement using the {@link RyaType} for the corresponding position
          * (Subject, Predicate, Object).
-         * 
+         *
          * @param statement
          * @return BindingSet
          */
-        private BindingSet buildBindingSetFromStatementPattern(RyaStatement statement) {
-            Var subjVar = sp.getSubjectVar();
-            Var predVar = sp.getPredicateVar();
-            Var objVar = sp.getObjectVar();
-            Var contextVar = sp.getContextVar();
-            QueryBindingSet bs = new QueryBindingSet();
+        private BindingSet buildBindingSetFromStatementPattern(final RyaStatement statement) {
+            final Var subjVar = sp.getSubjectVar();
+            final Var predVar = sp.getPredicateVar();
+            final Var objVar = sp.getObjectVar();
+            final Var contextVar = sp.getContextVar();
+            final QueryBindingSet bs = new QueryBindingSet();
 
             if (subjVar.getValue() == null) {
                 bs.addBinding(subjVar.getName(), RyaToRdfConversions.convertValue(statement.getSubject()));
@@ -683,7 +688,7 @@
             if (objVar.getValue() == null) {
                 bs.addBinding(objVar.getName(), RyaToRdfConversions.convertValue(statement.getObject()));
             }
-            
+
             if (contextVar != null && contextVar.getValue() == null) {
                 bs.addBinding(contextVar.getName(), RyaToRdfConversions.convertValue(statement.getContext()));
             }
@@ -691,10 +696,10 @@
             return bs;
         }
 
-        private boolean canJoinBindingSets(BindingSet bs1, BindingSet bs2) {
-            for (Binding b : bs1) {
-                String name = b.getName();
-                Value val = b.getValue();
+        private boolean canJoinBindingSets(final BindingSet bs1, final BindingSet bs2) {
+            for (final Binding b : bs1) {
+                final String name = b.getName();
+                final Value val = b.getValue();
                 if (bs2.hasBinding(name) && (!bs2.getValue(name).equals(val))) {
                     return false;
                 }
diff --git a/extras/rya.forwardchain/src/main/java/org/apache/rya/forwardchain/strategy/MongoPipelineStrategy.java b/extras/rya.forwardchain/src/main/java/org/apache/rya/forwardchain/strategy/MongoPipelineStrategy.java
index 95ad841..4e80428 100644
--- a/extras/rya.forwardchain/src/main/java/org/apache/rya/forwardchain/strategy/MongoPipelineStrategy.java
+++ b/extras/rya.forwardchain/src/main/java/org/apache/rya/forwardchain/strategy/MongoPipelineStrategy.java
@@ -50,12 +50,11 @@
 import org.eclipse.rdf4j.query.algebra.TupleExpr;
 
 import com.google.common.base.Preconditions;
-import com.mongodb.Block;
+import com.mongodb.BasicDBObject;
 import com.mongodb.DBObject;
 import com.mongodb.MongoClient;
 import com.mongodb.client.MongoCollection;
 import com.mongodb.client.MongoDatabase;
-import com.mongodb.util.JSON;
 
 /**
  * A rule execution strategy for MongoDB Rya that converts a single rule into an
@@ -83,7 +82,7 @@
      *      passed a stateful configuration, uses the existing mongo client,
      *      otherwise creates one.
      */
-    public MongoPipelineStrategy(MongoDBRdfConfiguration mongoConf) throws ForwardChainException {
+    public MongoPipelineStrategy(final MongoDBRdfConfiguration mongoConf) throws ForwardChainException {
         Preconditions.checkNotNull(mongoConf);
         final String mongoDBName = mongoConf.getMongoDBName();
         final String collectionName = mongoConf.getTriplesCollectionName();
@@ -100,7 +99,7 @@
                 this.dao = RyaSailFactory.getMongoDAO(mongoConf);
                 statefulConf = this.dao.getConf();
             }
-        } catch (RyaDAOException e) {
+        } catch (final RyaDAOException e) {
             throw new ForwardChainException("Can't connect to Rya.", e);
         }
         final MongoClient mongoClient = statefulConf.getMongoClient();
@@ -130,11 +129,11 @@
      * @throws ForwardChainException if execution fails.
      */
     @Override
-    public long executeConstructRule(AbstractConstructRule rule,
-            StatementMetadata metadata) throws ForwardChainException {
+    public long executeConstructRule(final AbstractConstructRule rule,
+            final StatementMetadata metadata) throws ForwardChainException {
         Preconditions.checkNotNull(rule);
         logger.info("Applying inference rule " + rule + "...");
-        long timestamp = System.currentTimeMillis();
+        final long timestamp = System.currentTimeMillis();
         // Get a pipeline that turns individual matches into triples
         List<Bson> pipeline = null;
         try {
@@ -149,20 +148,20 @@
             }
             pipeline = toPipeline(rule, requireSourceLevel, timestamp);
         }
-        catch (ForwardChainException e) {
+        catch (final ForwardChainException e) {
             logger.error(e);
         }
         if (pipeline == null) {
             if (backup == null) {
                 logger.error("Couldn't convert " + rule + " to pipeline:");
-                for (String line : rule.getQuery().toString().split("\n")) {
+                for (final String line : rule.getQuery().toString().split("\n")) {
                     logger.error("\t" + line);
                 }
                 throw new UnsupportedOperationException("Couldn't convert query to pipeline.");
             }
             else {
                 logger.debug("Couldn't convert " + rule + " to pipeline:");
-                for (String line : rule.getQuery().toString().split("\n")) {
+                for (final String line : rule.getQuery().toString().split("\n")) {
                     logger.debug("\t" + line);
                 }
                 logger.debug("Using fallback strategy.");
@@ -171,32 +170,30 @@
             }
         }
         // Execute the pipeline
-        for (Bson step : pipeline) {
+        for (final Bson step : pipeline) {
             logger.debug("\t" + step.toString());
         }
-        LongAdder count = new LongAdder();
+        final LongAdder count = new LongAdder();
         baseCollection.aggregate(pipeline)
             .allowDiskUse(true)
             .batchSize(PIPELINE_BATCH_SIZE)
-            .forEach(new Block<Document>() {
-                @Override
-                public void apply(Document doc) {
-                    final DBObject dbo = (DBObject) JSON.parse(doc.toJson());
-                    RyaStatement rstmt = storageStrategy.deserializeDBObject(dbo);
-                    if (!statementExists(rstmt)) {
-                        count.increment();
-                        doc.replace(SimpleMongoDBStorageStrategy.STATEMENT_METADATA, metadata.toString());
-                        try {
-                            batchWriter.addObjectToQueue(doc);
-                        } catch (MongoDbBatchWriterException e) {
-                            logger.error("Couldn't insert " + rstmt, e);
-                        }
+            .forEach((final Document doc) -> {
+                final DBObject dbo = BasicDBObject.parse(doc.toJson());
+                final RyaStatement rstmt = storageStrategy.deserializeDBObject(dbo);
+                if (!statementExists(rstmt)) {
+                    count.increment();
+                    doc.replace(SimpleMongoDBStorageStrategy.STATEMENT_METADATA, metadata.toString());
+                    try {
+                        batchWriter.addObjectToQueue(doc);
+                    } catch (final MongoDbBatchWriterException e) {
+                        logger.error("Couldn't insert " + rstmt, e);
                     }
                 }
             });
+
         try {
             batchWriter.flush();
-        } catch (MongoDbBatchWriterException e) {
+        } catch (final MongoDbBatchWriterException e) {
             throw new ForwardChainException("Error writing to Mongo", e);
         }
         logger.info("Added " + count + " new statements.");
@@ -211,10 +208,10 @@
         return count.longValue();
     }
 
-    private boolean statementExists(RyaStatement rstmt) {
+    private boolean statementExists(final RyaStatement rstmt) {
         try {
             return engine.query(new RyaQuery(rstmt)).iterator().hasNext();
-        } catch (RyaDAOException e) {
+        } catch (final RyaDAOException e) {
             logger.error("Error querying for " + rstmt, e);
             return false;
         }
@@ -231,7 +228,7 @@
         backup.shutDown();
         try {
             batchWriter.shutdown();
-        } catch (MongoDbBatchWriterException e) {
+        } catch (final MongoDbBatchWriterException e) {
             throw new ForwardChainException("Error shutting down batch writer", e);
         }
     }
@@ -247,24 +244,24 @@
      * @return An aggregation pipeline.
      * @throws ForwardChainException if pipeline construction fails.
      */
-    private List<Bson> toPipeline(AbstractConstructRule rule, int sourceLevel,
-            long timestamp) throws ForwardChainException {
+    private List<Bson> toPipeline(final AbstractConstructRule rule, final int sourceLevel,
+            final long timestamp) throws ForwardChainException {
         TupleExpr tupleExpr = rule.getQuery().getTupleExpr();
         if (!(tupleExpr instanceof QueryRoot)) {
             tupleExpr = new QueryRoot(tupleExpr);
         }
         try {
             tupleExpr.visit(pipelineVisitor);
-        } catch (Exception e) {
+        } catch (final Exception e) {
             throw new ForwardChainException("Error converting construct rule to an aggregation pipeline", e);
         }
         if (tupleExpr instanceof QueryRoot) {
-            QueryRoot root = (QueryRoot) tupleExpr;
+            final QueryRoot root = (QueryRoot) tupleExpr;
             if (root.getArg() instanceof AggregationPipelineQueryNode) {
-                AggregationPipelineQueryNode pipelineNode = (AggregationPipelineQueryNode) root.getArg();
+                final AggregationPipelineQueryNode pipelineNode = (AggregationPipelineQueryNode) root.getArg();
                 pipelineNode.distinct(); // require distinct triples
                 pipelineNode.requireSourceDerivationDepth(sourceLevel);
-                long latestTime = executionTimes.getOrDefault(rule, 0L);
+                final long latestTime = executionTimes.getOrDefault(rule, 0L);
                 if (latestTime > 0) {
                     pipelineNode.requireSourceTimestamp(latestTime);
                 }
diff --git a/extras/rya.forwardchain/src/test/java/org/apache/rya/forwardchain/batch/MongoSpinIT.java b/extras/rya.forwardchain/src/test/java/org/apache/rya/forwardchain/batch/MongoSpinIT.java
index 84080e5..efa13dc 100644
--- a/extras/rya.forwardchain/src/test/java/org/apache/rya/forwardchain/batch/MongoSpinIT.java
+++ b/extras/rya.forwardchain/src/test/java/org/apache/rya/forwardchain/batch/MongoSpinIT.java
@@ -18,10 +18,13 @@
  */
 package org.apache.rya.forwardchain.batch;
 
+import static org.junit.Assert.assertEquals;
+
 import java.io.BufferedReader;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.net.URL;
+import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Set;
@@ -37,10 +40,11 @@
 import org.apache.rya.test.mongo.EmbeddedMongoFactory;
 import org.eclipse.rdf4j.model.ValueFactory;
 import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
+import org.eclipse.rdf4j.query.AbstractTupleQueryResultHandler;
 import org.eclipse.rdf4j.query.BindingSet;
 import org.eclipse.rdf4j.query.QueryLanguage;
 import org.eclipse.rdf4j.query.TupleQuery;
-import org.eclipse.rdf4j.query.TupleQueryResult;
+import org.eclipse.rdf4j.query.TupleQueryResultHandlerException;
 import org.eclipse.rdf4j.query.impl.ListBindingSet;
 import org.eclipse.rdf4j.repository.RepositoryException;
 import org.eclipse.rdf4j.repository.sail.SailRepository;
@@ -48,11 +52,10 @@
 import org.eclipse.rdf4j.rio.RDFFormat;
 import org.eclipse.rdf4j.rio.Rio;
 import org.junit.After;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import com.google.common.base.Charsets;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.io.Resources;
 import com.mongodb.MongoClient;
 import com.mongodb.ServerAddress;
@@ -84,48 +87,52 @@
     }
 
     @Test
+    public void testNoStrategy() throws Exception {
+        loadDataFiles();
+        final Set<BindingSet> solutions = executeQuery(Resources.getResource("query.sparql"));
+        final Set<BindingSet> expected = new HashSet<>();
+        assertEquals(expected, solutions);
+    }
+
+    @Test
     public void testSailStrategy() throws Exception {
-        insertDataFile(Resources.getResource("data.ttl"), "http://example.org#");
-        insertDataFile(Resources.getResource("university.ttl"), "http://example.org#");
-        insertDataFile(Resources.getResource("owlrl.ttl"), "http://example.org#");
-        Set<BindingSet> solutions = executeQuery(Resources.getResource("query.sparql"));
-        Set<BindingSet> expected = new HashSet<>();
-        Assert.assertEquals(expected, solutions);
+        loadDataFiles();
         conf.setUseAggregationPipeline(false);
-        ForwardChainSpinTool tool = new ForwardChainSpinTool();
+        final ForwardChainSpinTool tool = new ForwardChainSpinTool();
         ToolRunner.run(conf, tool, new String[] {});
-        solutions = executeQuery(Resources.getResource("query.sparql"));
-        expected.add(new ListBindingSet(Arrays.asList("X", "Y"),
+        final Set<BindingSet> solutions = executeQuery(Resources.getResource("query.sparql"));
+        final Set<BindingSet> expected = ImmutableSet.of(new ListBindingSet(Arrays.asList("X", "Y"),
             VF.createIRI(EX, "Alice"), VF.createIRI(EX, "Department1")));
-        Assert.assertEquals(expected, solutions);
+        assertEquals(expected, solutions);
         // TODO: Check if spin rules with empty WHERE clauses, such as
         // rl:scm-cls in the owlrl.ttl test file, should be included.
-        Assert.assertEquals(48, tool.getNumInferences());
+        assertEquals(48, tool.getNumInferences());
     }
 
     @Test
     public void testPipelineStrategy() throws Exception {
+        loadDataFiles();
+        conf.setUseAggregationPipeline(true);
+        final ForwardChainSpinTool tool = new ForwardChainSpinTool();
+        ToolRunner.run(conf, tool, new String[] {});
+        final Set<BindingSet> solutions = executeQuery(Resources.getResource("query.sparql"));
+        final Set<BindingSet> expected = ImmutableSet.of(new ListBindingSet(Arrays.asList("X", "Y"),
+            VF.createIRI(EX, "Alice"), VF.createIRI(EX, "Department1")));
+        assertEquals(expected, solutions);
+        // TODO: Check if spin rules with empty WHERE clauses, such as
+        // rl:scm-cls in the owlrl.ttl test file, should be included.
+        assertEquals(41, tool.getNumInferences());
+    }
+
+    private void loadDataFiles() throws Exception {
         insertDataFile(Resources.getResource("data.ttl"), "http://example.org#");
         insertDataFile(Resources.getResource("university.ttl"), "http://example.org#");
         insertDataFile(Resources.getResource("owlrl.ttl"), "http://example.org#");
-        Set<BindingSet> solutions = executeQuery(Resources.getResource("query.sparql"));
-        Set<BindingSet> expected = new HashSet<>();
-        Assert.assertEquals(expected, solutions);
-        conf.setUseAggregationPipeline(true);
-        ForwardChainSpinTool tool = new ForwardChainSpinTool();
-        ToolRunner.run(conf, tool, new String[] {});
-        solutions = executeQuery(Resources.getResource("query.sparql"));
-        expected.add(new ListBindingSet(Arrays.asList("X", "Y"),
-            VF.createIRI(EX, "Alice"), VF.createIRI(EX, "Department1")));
-        Assert.assertEquals(expected, solutions);
-        // TODO: Check if spin rules with empty WHERE clauses, such as
-        // rl:scm-cls in the owlrl.ttl test file, should be included.
-        Assert.assertEquals(41, tool.getNumInferences());
     }
 
-    private void insertDataFile(URL dataFile, String defaultNamespace) throws Exception {
-        RDFFormat format = Rio.getParserFormatForFileName(dataFile.getFile()).get();
-        SailRepositoryConnection conn = repository.getConnection();
+    private void insertDataFile(final URL dataFile, final String defaultNamespace) throws Exception {
+        final RDFFormat format = Rio.getParserFormatForFileName(dataFile.getFile()).get();
+        final SailRepositoryConnection conn = repository.getConnection();
         try {
             conn.add(dataFile, defaultNamespace, format);
         } finally {
@@ -133,29 +140,29 @@
         }
     }
 
-    private Set<BindingSet> executeQuery(URL queryFile) throws Exception {
-        SailRepositoryConnection conn = repository.getConnection();
-        try {
-            try(
-                final InputStream queryIS = queryFile.openStream();
-                final BufferedReader br = new BufferedReader(new InputStreamReader(queryIS, Charsets.UTF_8));
-            ) {
-                final String query = br.lines().collect(Collectors.joining("\n"));
-                final TupleQuery tupleQuery = conn.prepareTupleQuery(QueryLanguage.SPARQL, query);
-                final TupleQueryResult result = tupleQuery.evaluate();
-                final Set<BindingSet> solutions = new HashSet<>();
-                while (result.hasNext()) {
-                    solutions.add(result.next());
+    private Set<BindingSet> executeQuery(final URL queryFile) throws Exception {
+        final SailRepositoryConnection conn = repository.getConnection();
+        try(
+            final InputStream queryIS = queryFile.openStream();
+            final BufferedReader br = new BufferedReader(new InputStreamReader(queryIS, StandardCharsets.UTF_8));
+        ) {
+            final String query = br.lines().collect(Collectors.joining("\n"));
+            final TupleQuery tupleQuery = conn.prepareTupleQuery(QueryLanguage.SPARQL, query);
+            final Set<BindingSet> solutions = new HashSet<>();
+            tupleQuery.evaluate(new AbstractTupleQueryResultHandler() {
+                @Override
+                public void handleSolution(final BindingSet bindingSet) throws TupleQueryResultHandlerException {
+                    solutions.add(bindingSet);
                 }
-                return solutions;
-            }
+            });
+            return solutions;
         } finally {
             closeQuietly(conn);
         }
     }
 
     private static MongoDBRdfConfiguration getConf() throws Exception {
-        MongoDBIndexingConfigBuilder builder = MongoIndexingConfiguration.builder().setUseMockMongo(true);
+        final MongoDBIndexingConfigBuilder builder = MongoIndexingConfiguration.builder().setUseMockMongo(true);
         final MongoClient c = EmbeddedMongoFactory.newFactory().newMongoClient();
         final ServerAddress address = c.getAddress();
         builder.setMongoHost(address.getHost());