RYA-377 Code review.
diff --git a/common/rya.api.function/pom.xml b/common/rya.api.function/pom.xml
index ce88e36..5b7ee0a 100644
--- a/common/rya.api.function/pom.xml
+++ b/common/rya.api.function/pom.xml
@@ -27,8 +27,8 @@
<version>3.2.12-incubating-SNAPSHOT</version>
</parent>
- <artifactId>rya.api.function</artifactId>
- <name>Apache Rya Common API - Functions</name>
+ <artifactId>rya.api.evaluation</artifactId>
+ <name>Apache Rya Common API - Evaluation Functions</name>
<dependencies>
<!-- Rya dependencies. -->
diff --git a/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AverageFunction.java b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AverageFunction.java
index a73d5ac..4a31fce 100644
--- a/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AverageFunction.java
+++ b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AverageFunction.java
@@ -19,6 +19,7 @@
package org.apache.rya.api.function.aggregation;
import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
import java.math.BigDecimal;
import java.math.BigInteger;
@@ -51,6 +52,8 @@
@Override
public void update(final AggregationElement aggregation, final AggregationState state, final VisibilityBindingSet childBindingSet) {
checkArgument(aggregation.getAggregationType() == AggregationType.AVERAGE, "The AverageFunction only accepts AVERAGE AggregationElements.");
+ requireNonNull(state);
+ requireNonNull(childBindingSet);
// Only update the average if the child contains the binding that we are averaging.
final String aggregatedName = aggregation.getAggregatedBindingName();
diff --git a/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/CountFunction.java b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/CountFunction.java
index 7dd5b21..879df5e 100644
--- a/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/CountFunction.java
+++ b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/CountFunction.java
@@ -19,6 +19,7 @@
package org.apache.rya.api.function.aggregation;
import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
import java.math.BigInteger;
@@ -39,6 +40,8 @@
@Override
public void update(final AggregationElement aggregation, final AggregationState state, final VisibilityBindingSet childBindingSet) {
checkArgument(aggregation.getAggregationType() == AggregationType.COUNT, "The CountFunction only accepts COUNT AggregationElements.");
+ requireNonNull(state);
+ requireNonNull(childBindingSet);
// Only add one to the count if the child contains the binding that we are counting.
final String aggregatedName = aggregation.getAggregatedBindingName();
diff --git a/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/MaxFunction.java b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/MaxFunction.java
index 3295fbb..5b5d493 100644
--- a/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/MaxFunction.java
+++ b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/MaxFunction.java
@@ -19,6 +19,7 @@
package org.apache.rya.api.function.aggregation;
import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
import org.apache.rya.api.model.VisibilityBindingSet;
import org.openrdf.model.Value;
@@ -40,6 +41,8 @@
@Override
public void update(final AggregationElement aggregation, final AggregationState state, final VisibilityBindingSet childBindingSet) {
checkArgument(aggregation.getAggregationType() == AggregationType.MAX, "The MaxFunction only accepts MAX AggregationElements.");
+ requireNonNull(state);
+ requireNonNull(childBindingSet);
// Only update the max if the child contains the binding that we are finding the max value for.
final String aggregatedName = aggregation.getAggregatedBindingName();
diff --git a/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/MinFunction.java b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/MinFunction.java
index d6bf751..f1b083c 100644
--- a/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/MinFunction.java
+++ b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/MinFunction.java
@@ -19,6 +19,7 @@
package org.apache.rya.api.function.aggregation;
import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
import org.apache.rya.api.model.VisibilityBindingSet;
import org.openrdf.model.Value;
@@ -40,6 +41,8 @@
@Override
public void update(final AggregationElement aggregation, final AggregationState state, final VisibilityBindingSet childBindingSet) {
checkArgument(aggregation.getAggregationType() == AggregationType.MIN, "The MinFunction only accepts MIN AggregationElements.");
+ requireNonNull(state);
+ requireNonNull(childBindingSet);
// Only update the min if the child contains the binding that we are finding the min value for.
final String aggregatedName = aggregation.getAggregatedBindingName();
diff --git a/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/SumFunction.java b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/SumFunction.java
index 97735f2..7ddc9ae 100644
--- a/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/SumFunction.java
+++ b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/SumFunction.java
@@ -19,6 +19,7 @@
package org.apache.rya.api.function.aggregation;
import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
import java.math.BigInteger;
@@ -48,6 +49,8 @@
@Override
public void update(final AggregationElement aggregation, final AggregationState state, final VisibilityBindingSet childBindingSet) {
checkArgument(aggregation.getAggregationType() == AggregationType.SUM, "The SumFunction only accepts SUM AggregationElements.");
+ requireNonNull(state);
+ requireNonNull(childBindingSet);
// Only add values to the sum if the child contains the binding that we are summing.
final String aggregatedName = aggregation.getAggregatedBindingName();
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/CloseableIterator.java b/common/rya.api/src/main/java/org/apache/rya/api/utils/CloseableIterator.java
similarity index 72%
rename from extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/CloseableIterator.java
rename to common/rya.api/src/main/java/org/apache/rya/api/utils/CloseableIterator.java
index 9ea927d..c29f5e0 100644
--- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/CloseableIterator.java
+++ b/common/rya.api/src/main/java/org/apache/rya/api/utils/CloseableIterator.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,17 +16,16 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.rya.streams.kafka.processors.join;
+package org.apache.rya.api.utils;
import java.util.Iterator;
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
-
/**
- * An {@link Iterator} that is also {@link AutoCloseable}.
+ * An {@link Iterator} that also extends {@link AutoCloseable} because it has reference to resources
+ * that need to be released once you are done iterating.
*
- * @param <T> - The type of elements that will be iterated over.
+ * @param <T> The type of object that is iterated over.
*/
-@DefaultAnnotation(NonNull.class)
-public interface CloseableIterator<T> extends Iterator<T>, AutoCloseable { }
\ No newline at end of file
+public interface CloseableIterator<T> extends Iterator<T>, AutoCloseable {
+
+}
\ No newline at end of file
diff --git a/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloBatchUpdatePCJIT.java b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloBatchUpdatePCJIT.java
index 40941c8..5028454 100644
--- a/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloBatchUpdatePCJIT.java
+++ b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloBatchUpdatePCJIT.java
@@ -27,11 +27,11 @@
import org.apache.rya.accumulo.AccumuloRdfConfiguration;
import org.apache.rya.api.client.Install.InstallConfiguration;
import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.utils.CloseableIterator;
import org.apache.rya.indexing.accumulo.ConfigUtils;
import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType;
import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType;
import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
import org.apache.rya.sail.config.RyaSailFactory;
import org.junit.Test;
diff --git a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/processor/TimestampedNotificationProcessor.java b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/processor/TimestampedNotificationProcessor.java
index ae586da..dcc47b6 100644
--- a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/processor/TimestampedNotificationProcessor.java
+++ b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/processor/TimestampedNotificationProcessor.java
@@ -22,8 +22,8 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.rya.api.utils.CloseableIterator;
import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
import org.apache.rya.periodic.notification.api.BinPruner;
import org.apache.rya.periodic.notification.api.BindingSetRecord;
import org.apache.rya.periodic.notification.api.NodeBin;
diff --git a/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java b/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java
index 92e3276..cd06f2a 100644
--- a/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java
+++ b/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java
@@ -54,6 +54,7 @@
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.rya.api.resolver.RdfToRyaConversions;
+import org.apache.rya.api.utils.CloseableIterator;
import org.apache.rya.indexing.accumulo.ConfigUtils;
import org.apache.rya.indexing.pcj.fluo.api.CreatePeriodicQuery;
import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
@@ -61,7 +62,6 @@
import org.apache.rya.indexing.pcj.fluo.app.util.FluoClientFactory;
import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils;
import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage;
import org.apache.rya.pcj.fluo.test.base.RyaExportITBase;
import org.apache.rya.periodic.notification.notification.CommandNotification;
diff --git a/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/pruner/PeriodicNotificationBinPrunerIT.java b/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/pruner/PeriodicNotificationBinPrunerIT.java
index 830fa46..ac2202c 100644
--- a/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/pruner/PeriodicNotificationBinPrunerIT.java
+++ b/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/pruner/PeriodicNotificationBinPrunerIT.java
@@ -39,6 +39,7 @@
import org.apache.fluo.api.data.Span;
import org.apache.fluo.core.client.FluoClientImpl;
import org.apache.rya.api.resolver.RdfToRyaConversions;
+import org.apache.rya.api.utils.CloseableIterator;
import org.apache.rya.indexing.pcj.fluo.api.CreatePeriodicQuery;
import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
@@ -48,7 +49,6 @@
import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil;
import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage;
import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
import org.apache.rya.pcj.fluo.test.base.RyaExportITBase;
@@ -68,7 +68,7 @@
public class PeriodicNotificationBinPrunerIT extends RyaExportITBase {
-
+
@Test
public void periodicPrunerTest() throws Exception {
@@ -238,7 +238,7 @@
pruner.stop();
}
-
+
private void compareResults(PeriodicQueryResultStorage periodicStorage, String queryId, long bin, Set<BindingSet> expected) throws PeriodicQueryStorageException, Exception {
try(CloseableIterator<BindingSet> iter = periodicStorage.listResults(queryId, Optional.of(bin))) {
Set<BindingSet> actual = new HashSet<>();
@@ -248,13 +248,13 @@
Assert.assertEquals(expected, actual);
}
}
-
+
private void compareFluoCounts(FluoClient client, String pcjId, long bin) {
QueryBindingSet bs = new QueryBindingSet();
bs.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, new LiteralImpl(Long.toString(bin), XMLSchema.LONG));
-
+
VariableOrder varOrder = new VariableOrder(IncrementalUpdateConstants.PERIODIC_BIN_ID);
-
+
try(Snapshot sx = client.newSnapshot()) {
String fluoQueryId = NodeType.generateNewIdForType(NodeType.QUERY, pcjId);
Set<String> ids = new HashSet<>();
@@ -279,5 +279,5 @@
}
}
}
-
+
}
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PeriodicQueryResultStorage.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PeriodicQueryResultStorage.java
index 6637dde..2936738 100644
--- a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PeriodicQueryResultStorage.java
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PeriodicQueryResultStorage.java
@@ -23,7 +23,7 @@
import java.util.Optional;
import org.apache.rya.api.model.VisibilityBindingSet;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
+import org.apache.rya.api.utils.CloseableIterator;
import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
import org.openrdf.query.BindingSet;
@@ -32,7 +32,7 @@
*
*/
public interface PeriodicQueryResultStorage {
-
+
/**
* Binding name for the periodic bin id
*/
@@ -45,27 +45,27 @@
* @throws PeriodicQueryStorageException
*/
public String createPeriodicQuery(String sparql) throws PeriodicQueryStorageException;
-
+
/**
* Creates a PeriodicQuery result storage layer for the given SPARQL query with the given id
* @param queryId - id of the storage layer for the given SPARQL query
* @param sparql - SPARQL query whose periodic results will be stored
- * @return - id of the storage layer
+ * @return - id of the storage layer
* @throws PeriodicQueryStorageException
*/
public String createPeriodicQuery(String queryId, String sparql) throws PeriodicQueryStorageException;
-
+
/**
* Creates a PeriodicQuery result storage layer for the given SPARQL query with the given id
* whose results are written in the order indicated by the specified VariableOrder.
* @param queryId - id of the storage layer for the given SPARQL query
* @param sparql - SPARQL query whose periodic results will be stored
* @param varOrder - VariableOrder indicating the order that results will be written in
- * @return - id of the storage layer
+ * @return - id of the storage layer
* @throws PeriodicQueryStorageException
*/
public void createPeriodicQuery(String queryId, String sparql, VariableOrder varOrder) throws PeriodicQueryStorageException;
-
+
/**
* Retrieve the {@link PeriodicQueryStorageMetdata} for the give query id
* @param queryID - id of the query whose metadata will be returned
@@ -73,7 +73,7 @@
* @throws PeriodicQueryStorageException
*/
public PeriodicQueryStorageMetadata getPeriodicQueryMetadata(String queryID) throws PeriodicQueryStorageException;;
-
+
/**
* Add periodic query results to the storage layer indicated by the given query id
* @param queryId - id indicating the storage layer that results will be added to
@@ -81,7 +81,7 @@
* @throws PeriodicQueryStorageException
*/
public void addPeriodicQueryResults(String queryId, Collection<VisibilityBindingSet> results) throws PeriodicQueryStorageException;;
-
+
/**
* Deletes periodic query results from the storage layer
* @param queryId - id indicating the storage layer that results will be deleted from
@@ -89,14 +89,14 @@
* @throws PeriodicQueryStorageException
*/
public void deletePeriodicQueryResults(String queryId, long binID) throws PeriodicQueryStorageException;;
-
+
/**
- * Deletes all results for the storage layer indicated by the given query id
+ * Deletes all results for the storage layer indicated by the given query id
* @param queryID - id indicating the storage layer whose results will be deleted
* @throws PeriodicQueryStorageException
*/
public void deletePeriodicQuery(String queryID) throws PeriodicQueryStorageException;;
-
+
/**
* List results in the given storage layer indicated by the query id
* @param queryId - id indicating the storage layer whose results will be listed
@@ -105,11 +105,11 @@
* @throws PeriodicQueryStorageException
*/
public CloseableIterator<BindingSet> listResults(String queryId, Optional<Long> binID) throws PeriodicQueryStorageException;;
-
+
/**
* List all storage tables containing periodic results.
* @return List of Strings with names of all tables containing periodic results
*/
public List<String> listPeriodicTables();
-
+
}
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PrecomputedJoinStorage.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PrecomputedJoinStorage.java
index 4988035..70c8b0e 100644
--- a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PrecomputedJoinStorage.java
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PrecomputedJoinStorage.java
@@ -23,6 +23,7 @@
import java.util.List;
import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.api.utils.CloseableIterator;
import org.openrdf.query.BindingSet;
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
@@ -103,16 +104,6 @@
public void close() throws PCJStorageException;
/**
- * An {@link Iterator} that also extends {@link AutoCloseable} because it has reference to resources
- * that need to be released once you are done iterating.
- *
- * @param <T> The type of object that is iterated over.
- */
- public static interface CloseableIterator<T> extends Iterator<T>, AutoCloseable {
-
- }
-
- /**
* An operation of {@link PrecomputedJoinStorage} failed.
*/
public static class PCJStorageException extends PcjException {
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjStorage.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjStorage.java
index 3d0f11b..f3d078d 100644
--- a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjStorage.java
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjStorage.java
@@ -39,6 +39,7 @@
import org.apache.rya.api.instance.RyaDetailsUpdater;
import org.apache.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator.CouldNotApplyMutationException;
import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.api.utils.CloseableIterator;
import org.apache.rya.indexing.pcj.storage.PCJIdFactory;
import org.apache.rya.indexing.pcj.storage.PcjMetadata;
import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPeriodicQueryResultStorage.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPeriodicQueryResultStorage.java
index f8547f5..8124aff 100644
--- a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPeriodicQueryResultStorage.java
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPeriodicQueryResultStorage.java
@@ -38,11 +38,11 @@
import org.apache.accumulo.core.security.Authorizations;
import org.apache.hadoop.io.Text;
import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.api.utils.CloseableIterator;
import org.apache.rya.indexing.pcj.storage.PCJIdFactory;
import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException;
import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageMetadata;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException;
import org.openrdf.model.impl.LiteralImpl;
import org.openrdf.model.vocabulary.XMLSchema;
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloValueBindingSetIterator.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloValueBindingSetIterator.java
index ff8ff14..c488d36 100644
--- a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloValueBindingSetIterator.java
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloValueBindingSetIterator.java
@@ -25,8 +25,8 @@
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.fluo.api.data.Bytes;
+import org.apache.rya.api.utils.CloseableIterator;
import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
import org.openrdf.query.BindingSet;
/**
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTables.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTables.java
index 40db32a..9346c00 100644
--- a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTables.java
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTables.java
@@ -56,10 +56,9 @@
import org.apache.hadoop.io.Text;
import org.apache.log4j.Logger;
import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.api.utils.CloseableIterator;
import org.apache.rya.indexing.pcj.storage.PcjMetadata;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
-import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException;
import org.openrdf.query.BindingSet;
import org.openrdf.query.MalformedQueryException;
import org.openrdf.query.QueryEvaluationException;
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ScannerBindingSetIterator.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ScannerBindingSetIterator.java
index 26fd8c9..b457dfd 100644
--- a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ScannerBindingSetIterator.java
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ScannerBindingSetIterator.java
@@ -27,7 +27,7 @@
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
+import org.apache.rya.api.utils.CloseableIterator;
import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException;
import org.openrdf.query.BindingSet;
diff --git a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTablesIT.java b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTablesIT.java
index e689f9d..b95c812 100644
--- a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTablesIT.java
+++ b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTablesIT.java
@@ -45,9 +45,9 @@
import org.apache.rya.accumulo.RyaTestInstanceRule;
import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.api.utils.CloseableIterator;
import org.apache.rya.indexing.pcj.storage.PcjException;
import org.apache.rya.indexing.pcj.storage.PcjMetadata;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException;
import org.apache.rya.rdftriplestore.RdfCloudTripleStore;
@@ -120,7 +120,7 @@
private String getRyaInstanceName() {
return testInstance.getRyaInstanceName();
}
-
+
/**
* Format a Mini Accumulo to be a Rya repository.
*
diff --git a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/integration/AccumuloPcjStorageIT.java b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/integration/AccumuloPcjStorageIT.java
index 5ba5e40..33571f7 100644
--- a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/integration/AccumuloPcjStorageIT.java
+++ b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/integration/AccumuloPcjStorageIT.java
@@ -39,9 +39,9 @@
import org.apache.rya.api.instance.RyaDetailsRepository.NotInitializedException;
import org.apache.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException;
import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.api.utils.CloseableIterator;
import org.apache.rya.indexing.pcj.storage.PcjMetadata;
import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
import org.apache.rya.indexing.pcj.storage.accumulo.ShiftVarOrderFactory;
diff --git a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/integration/AccumuloPeriodicQueryResultStorageIT.java b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/integration/AccumuloPeriodicQueryResultStorageIT.java
index 1e21bf2..2d9da4d 100644
--- a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/integration/AccumuloPeriodicQueryResultStorageIT.java
+++ b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/integration/AccumuloPeriodicQueryResultStorageIT.java
@@ -30,10 +30,10 @@
import org.apache.accumulo.core.security.Authorizations;
import org.apache.rya.accumulo.AccumuloITBase;
import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.api.utils.CloseableIterator;
import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException;
import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageMetadata;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage;
import org.apache.rya.indexing.pcj.storage.accumulo.PeriodicQueryTableNameFactory;
import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
@@ -53,86 +53,86 @@
private static final String RYA = "rya_";
private static final PeriodicQueryTableNameFactory nameFactory = new PeriodicQueryTableNameFactory();
private static final ValueFactory vf = new ValueFactoryImpl();
-
+
@Before
public void init() throws AccumuloException, AccumuloSecurityException {
super.getConnector().securityOperations().changeUserAuthorizations("root", new Authorizations("U"));
periodicStorage = new AccumuloPeriodicQueryResultStorage(super.getConnector(), RYA);
}
-
-
+
+
@Test
public void testCreateAndMeta() throws PeriodicQueryStorageException {
-
+
String sparql = "select ?x where { ?x <urn:pred> ?y.}";
VariableOrder varOrder = new VariableOrder("periodicBinId", "x");
PeriodicQueryStorageMetadata expectedMeta = new PeriodicQueryStorageMetadata(sparql, varOrder);
-
+
String id = periodicStorage.createPeriodicQuery(sparql);
Assert.assertEquals(expectedMeta, periodicStorage.getPeriodicQueryMetadata(id));
Assert.assertEquals(Arrays.asList(nameFactory.makeTableName(RYA, id)), periodicStorage.listPeriodicTables());
periodicStorage.deletePeriodicQuery(id);
}
-
-
+
+
@Test
public void testAddListDelete() throws Exception {
-
+
String sparql = "select ?x where { ?x <urn:pred> ?y.}";
String id = periodicStorage.createPeriodicQuery(sparql);
-
+
Set<BindingSet> expected = new HashSet<>();
Set<VisibilityBindingSet> storageSet = new HashSet<>();
-
+
//add result matching user's visibility
QueryBindingSet bs = new QueryBindingSet();
bs.addBinding("periodicBinId", vf.createLiteral(1L));
bs.addBinding("x",vf.createURI("uri:uri123"));
expected.add(bs);
storageSet.add(new VisibilityBindingSet(bs,"U"));
-
+
//add result with different visibility that is not expected
bs = new QueryBindingSet();
bs.addBinding("periodicBinId", vf.createLiteral(1L));
bs.addBinding("x",vf.createURI("uri:uri456"));
storageSet.add(new VisibilityBindingSet(bs,"V"));
-
+
periodicStorage.addPeriodicQueryResults(id, storageSet);
-
+
Set<BindingSet> actual = new HashSet<>();
try(CloseableIterator<BindingSet> iter = periodicStorage.listResults(id, Optional.of(1L))) {
iter.forEachRemaining(x -> actual.add(x));
}
-
+
Assert.assertEquals(expected, actual);
-
+
periodicStorage.deletePeriodicQueryResults(id, 1L);
-
+
Set<BindingSet> actual2 = new HashSet<>();
try(CloseableIterator<BindingSet> iter = periodicStorage.listResults(id, Optional.of(1L))) {
iter.forEachRemaining(x -> actual2.add(x));
}
-
+
Assert.assertEquals(new HashSet<>(), actual2);
periodicStorage.deletePeriodicQuery(id);
-
+
}
-
+
@Test
public void multiBinTest() throws PeriodicQueryStorageException, Exception {
-
+
String sparql = "prefix function: <http://org.apache.rya/function#> " //n
+ "prefix time: <http://www.w3.org/2006/time#> " //n
+ "select ?id (count(?obs) as ?total) where {" //n
+ "Filter(function:periodic(?time, 2, .5, time:hours)) " //n
+ "?obs <uri:hasTime> ?time. " //n
+ "?obs <uri:hasId> ?id } group by ?id"; //n
-
-
+
+
final ValueFactory vf = new ValueFactoryImpl();
long currentTime = System.currentTimeMillis();
String queryId = UUID.randomUUID().toString().replace("-", "");
-
+
// Create the expected results of the SPARQL query once the PCJ has been computed.
final Set<BindingSet> expected1 = new HashSet<>();
final Set<BindingSet> expected2 = new HashSet<>();
@@ -142,81 +142,81 @@
long period = 1800000;
long binId = (currentTime/period)*period;
-
+
MapBindingSet bs = new MapBindingSet();
bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER));
bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING));
bs.addBinding("periodicBinId", vf.createLiteral(binId));
expected1.add(bs);
storageResults.add(new VisibilityBindingSet(bs));
-
+
bs = new MapBindingSet();
bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER));
bs.addBinding("id", vf.createLiteral("id_2", XMLSchema.STRING));
bs.addBinding("periodicBinId", vf.createLiteral(binId));
expected1.add(bs);
storageResults.add(new VisibilityBindingSet(bs));
-
+
bs = new MapBindingSet();
bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER));
bs.addBinding("id", vf.createLiteral("id_3", XMLSchema.STRING));
bs.addBinding("periodicBinId", vf.createLiteral(binId));
expected1.add(bs);
storageResults.add(new VisibilityBindingSet(bs));
-
+
bs = new MapBindingSet();
bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER));
bs.addBinding("id", vf.createLiteral("id_4", XMLSchema.STRING));
bs.addBinding("periodicBinId", vf.createLiteral(binId));
expected1.add(bs);
storageResults.add(new VisibilityBindingSet(bs));
-
+
bs = new MapBindingSet();
bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER));
bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING));
bs.addBinding("periodicBinId", vf.createLiteral(binId + period));
expected2.add(bs);
storageResults.add(new VisibilityBindingSet(bs));
-
+
bs = new MapBindingSet();
bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER));
bs.addBinding("id", vf.createLiteral("id_2", XMLSchema.STRING));
bs.addBinding("periodicBinId", vf.createLiteral(binId + period));
expected2.add(bs);
storageResults.add(new VisibilityBindingSet(bs));
-
+
bs = new MapBindingSet();
bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER));
bs.addBinding("id", vf.createLiteral("id_3", XMLSchema.STRING));
bs.addBinding("periodicBinId", vf.createLiteral(binId + period));
expected2.add(bs);
storageResults.add(new VisibilityBindingSet(bs));
-
+
bs = new MapBindingSet();
bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER));
bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING));
bs.addBinding("periodicBinId", vf.createLiteral(binId + 2*period));
expected3.add(bs);
storageResults.add(new VisibilityBindingSet(bs));
-
+
bs = new MapBindingSet();
bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER));
bs.addBinding("id", vf.createLiteral("id_2", XMLSchema.STRING));
bs.addBinding("periodicBinId", vf.createLiteral(binId + 2*period));
expected3.add(bs);
storageResults.add(new VisibilityBindingSet(bs));
-
+
bs = new MapBindingSet();
bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER));
bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING));
bs.addBinding("periodicBinId", vf.createLiteral(binId + 3*period));
expected4.add(bs);
storageResults.add(new VisibilityBindingSet(bs));
-
-
+
+
String id = periodicStorage.createPeriodicQuery(queryId, sparql);
periodicStorage.addPeriodicQueryResults(queryId, storageResults);
-
+
try(CloseableIterator<BindingSet> iter = periodicStorage.listResults(queryId, Optional.of(binId))) {
Set<BindingSet> actual1 = new HashSet<>();
while(iter.hasNext()) {
@@ -224,7 +224,7 @@
}
Assert.assertEquals(expected1, actual1);
}
-
+
periodicStorage.deletePeriodicQueryResults(queryId, binId);
try(CloseableIterator<BindingSet> iter = periodicStorage.listResults(queryId, Optional.of(binId))) {
Set<BindingSet> actual1 = new HashSet<>();
@@ -233,7 +233,7 @@
}
Assert.assertEquals(Collections.emptySet(), actual1);
}
-
+
try(CloseableIterator<BindingSet> iter = periodicStorage.listResults(queryId, Optional.of(binId + period))) {
Set<BindingSet> actual2 = new HashSet<>();
while(iter.hasNext()) {
@@ -241,7 +241,7 @@
}
Assert.assertEquals(expected2, actual2);
}
-
+
periodicStorage.deletePeriodicQueryResults(queryId, binId + period);
try(CloseableIterator<BindingSet> iter = periodicStorage.listResults(queryId, Optional.of(binId + period))) {
Set<BindingSet> actual2 = new HashSet<>();
@@ -250,7 +250,7 @@
}
Assert.assertEquals(Collections.emptySet(), actual2);
}
-
+
try(CloseableIterator<BindingSet> iter = periodicStorage.listResults(queryId, Optional.of(binId + 2*period))) {
Set<BindingSet> actual3 = new HashSet<>();
while(iter.hasNext()) {
@@ -258,7 +258,7 @@
}
Assert.assertEquals(expected3, actual3);
}
-
+
try(CloseableIterator<BindingSet> iter = periodicStorage.listResults(queryId, Optional.of(binId + 3*period))) {
Set<BindingSet> actual4 = new HashSet<>();
while(iter.hasNext()) {
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml b/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml
index f2e8cf9..5493a5f 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml
@@ -62,7 +62,7 @@
<dependency>
<groupId>org.apache.rya</groupId>
- <artifactId>rya.api.function</artifactId>
+ <artifactId>rya.api.evaluation</artifactId>
</dependency>
<!-- 3rd Party Runtime Dependencies. -->
diff --git a/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java b/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java
index 3fea6ed..181f322 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java
@@ -33,12 +33,12 @@
import org.apache.rya.api.domain.RyaURI;
import org.apache.rya.api.persist.RyaDAOException;
import org.apache.rya.api.resolver.RyaToRdfConversions;
+import org.apache.rya.api.utils.CloseableIterator;
import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj;
import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException;
import org.apache.rya.indexing.pcj.storage.PcjException;
import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
import org.apache.rya.rdftriplestore.RyaSailRepository;
import org.openrdf.model.Statement;
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java
index d623043..866d32b 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java
@@ -29,10 +29,10 @@
import org.apache.fluo.api.client.FluoFactory;
import org.apache.rya.api.domain.RyaStatement;
import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.api.utils.CloseableIterator;
import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj;
import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
import org.apache.rya.pcj.fluo.test.base.RyaExportITBase;
import org.junit.Test;
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
index 3e72f1b..610f502 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
@@ -39,11 +39,11 @@
import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
import org.apache.rya.api.functions.DateTimeWithinPeriod;
import org.apache.rya.api.functions.OWLTime;
+import org.apache.rya.api.utils.CloseableIterator;
import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj;
import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException;
import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage;
import org.apache.rya.pcj.fluo.test.base.RyaExportITBase;
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java
index 5cd3ab1..65083e8 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java
@@ -27,10 +27,10 @@
import org.apache.fluo.api.client.FluoClient;
import org.apache.fluo.api.client.FluoFactory;
import org.apache.rya.accumulo.AccumuloRyaDAO;
+import org.apache.rya.api.utils.CloseableIterator;
import org.apache.rya.indexing.external.PrecomputedJoinIndexer;
import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj;
import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
import org.apache.rya.indexing.pcj.update.PrecomputedJoinUpdater;
import org.apache.rya.pcj.fluo.test.base.RyaExportITBase;
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StreamingTestIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StreamingTestIT.java
index e83a894..6135920 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StreamingTestIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StreamingTestIT.java
@@ -28,9 +28,9 @@
import org.apache.fluo.api.client.FluoClient;
import org.apache.fluo.api.client.FluoFactory;
import org.apache.log4j.Logger;
+import org.apache.rya.api.utils.CloseableIterator;
import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj;
import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
import org.apache.rya.pcj.fluo.test.base.RyaExportITBase;
import org.junit.Test;
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java
index 8529bd5..90ed01a 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java
@@ -47,12 +47,12 @@
import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
import org.apache.rya.api.domain.RyaStatement;
import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.api.utils.CloseableIterator;
import org.apache.rya.indexing.accumulo.ConfigUtils;
import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig;
import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj;
import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
import org.apache.rya.indexing.pcj.storage.accumulo.PcjTableNameFactory;
import org.apache.rya.pcj.fluo.test.base.RyaExportITBase;
diff --git a/extras/rya.streams/api/pom.xml b/extras/rya.streams/api/pom.xml
index 250028f..55c0e79 100644
--- a/extras/rya.streams/api/pom.xml
+++ b/extras/rya.streams/api/pom.xml
@@ -54,11 +54,6 @@
<artifactId>guava</artifactId>
</dependency>
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- </dependency>
-
<!-- Test dependences -->
<dependency>
<groupId>junit</groupId>
diff --git a/extras/rya.streams/kafka/pom.xml b/extras/rya.streams/kafka/pom.xml
index 778630d..16b07b2 100644
--- a/extras/rya.streams/kafka/pom.xml
+++ b/extras/rya.streams/kafka/pom.xml
@@ -60,7 +60,7 @@
</dependency>
<dependency>
<groupId>org.apache.rya</groupId>
- <artifactId>rya.api.function</artifactId>
+ <artifactId>rya.api.evaluation</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rya</groupId>
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/ProcessorResult.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/ProcessorResult.java
index 5f7a06b..124bc76 100644
--- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/ProcessorResult.java
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/ProcessorResult.java
@@ -36,7 +36,7 @@
* {@link VisibilityBindingSet} because some downstream processors require more information about
* which upstream processor is emitting the result in order to do their work.
* </p>
- * Currently there are only two types processors:
+ * Currently there are only two types of processors:
* <ul>
* <li>Unary Processor - A processor that only has a single upstream node feeding it input.</li>
* <li>Binary Processor - A processor that has two upstream nodes feeding it input.</li>
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorSupplier.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorSupplier.java
index 9ed2363..367ca6f 100644
--- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorSupplier.java
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorSupplier.java
@@ -28,6 +28,7 @@
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.rya.api.function.join.IterativeJoin;
import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.api.utils.CloseableIterator;
import org.apache.rya.streams.kafka.processors.ProcessorResult;
import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult;
import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult.Side;
@@ -75,7 +76,7 @@
this.allVars = requireNonNull(allVars);
if(!allVars.subList(0, joinVars.size()).equals(joinVars)) {
- throw new IllegalArgumentException("All vars must be lead by the join vars, but it did not. " +
+ throw new IllegalArgumentException("The allVars list must start with the joinVars list, but it did not. " +
"Join Vars: " + joinVars + ", All Vars: " + allVars);
}
}
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinStateStore.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinStateStore.java
index 17a6ebb..2afc1d8 100644
--- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinStateStore.java
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinStateStore.java
@@ -19,6 +19,7 @@
package org.apache.rya.streams.kafka.processors.join;
import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.api.utils.CloseableIterator;
import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult;
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/KeyValueJoinStateStore.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/KeyValueJoinStateStore.java
index d12957a..254f226 100644
--- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/KeyValueJoinStateStore.java
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/KeyValueJoinStateStore.java
@@ -28,6 +28,7 @@
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.api.utils.CloseableIterator;
import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult;
import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult.Side;
import org.openrdf.query.impl.MapBindingSet;
diff --git a/extras/rya.streams/pom.xml b/extras/rya.streams/pom.xml
index dd876a0..93b6b1c 100644
--- a/extras/rya.streams/pom.xml
+++ b/extras/rya.streams/pom.xml
@@ -38,7 +38,15 @@
<module>kafka-test</module>
<module>api</module>
<module>client</module>
- <module>geo</module>
<module>integration</module>
</modules>
+
+ <profiles>
+ <profile>
+ <id>geoindexing</id>
+ <modules>
+ <module>geo</module>
+ </modules>
+ </profile>
+ </profiles>
</project>
diff --git a/pom.xml b/pom.xml
index 31b17f8..99640f2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -217,7 +217,7 @@
</dependency>
<dependency>
<groupId>org.apache.rya</groupId>
- <artifactId>rya.api.function</artifactId>
+ <artifactId>rya.api.evaluation</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>