RYA-377 Implement the Aggregation Processor for Rya Streams.
diff --git a/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AggregationStateStore.java b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AggregationStateStore.java
new file mode 100644
index 0000000..d37f4c7
--- /dev/null
+++ b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AggregationStateStore.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.function.aggregation;
+
+import java.util.Optional;
+
+import org.apache.rya.api.model.VisibilityBindingSet;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Provides a mechanism for storing the updating {@link AggregationState} while using an {@link AggregationsEvaluator}.
+ */
+@DefaultAnnotation(NonNull.class)
+public interface AggregationStateStore {
+
+ /**
+ * Stores a state. If this value updates a previously stored state, then it will overwrite the old value
+ * with the new one.
+ *
+ * @param state - The state that will be stored. (not null)
+ */
+ public void store(AggregationState state);
+
+ /**
+ * Get the {@link AggregationState} that may be updatted using the provided binding set.
+ *
+ * @param bs - A binding set that defines which state to fetch. (not null)
+ * @return The {@link AggregationState} that is updated by the binding set, if one has been stored.
+ */
+ public Optional<AggregationState> get(VisibilityBindingSet bs);
+}
\ No newline at end of file
diff --git a/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AggregationsEvaluator.java b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AggregationsEvaluator.java
new file mode 100644
index 0000000..2aa716f
--- /dev/null
+++ b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AggregationsEvaluator.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.function.aggregation;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.api.model.visibility.VisibilitySimplifier;
+import org.openrdf.query.algebra.AggregateOperator;
+import org.openrdf.query.algebra.Group;
+import org.openrdf.query.algebra.GroupElem;
+import org.openrdf.query.algebra.Var;
+import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
+import org.openrdf.query.impl.MapBindingSet;
+
+import com.google.common.collect.ImmutableMap;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A stateful evaluator that processes aggregation functions over variables that are grouped together.
+ * </p>
+ * The following aggregation functions are supported:
+ * <ul>
+ * <li>Count</li>
+ * <li>Sum</li>
+ * <li>Average</li>
+ * <li>Min</li>
+ * <li>Max</li>
+ * </ul>
+ * </p>
+ * The persistence of the aggregation's state is determined by the provided {@link AggregationStateStore}.
+ */
+@DefaultAnnotation(NonNull.class)
+public class AggregationsEvaluator {
+
+ private static final ImmutableMap<AggregationType, AggregationFunction> FUNCTIONS;
+ static {
+ final ImmutableMap.Builder<AggregationType, AggregationFunction> builder = ImmutableMap.builder();
+ builder.put(AggregationType.COUNT, new CountFunction());
+ builder.put(AggregationType.SUM, new SumFunction());
+ builder.put(AggregationType.AVERAGE, new AverageFunction());
+ builder.put(AggregationType.MIN, new MinFunction());
+ builder.put(AggregationType.MAX, new MaxFunction());
+ FUNCTIONS = builder.build();
+ }
+
+ private final AggregationStateStore aggStateStore;
+ private final Collection<AggregationElement> aggregations;
+ private final List<String> groupByVars;
+
+ /**
+ * Constructs an instance of {@link AggregationsEvaluator}.
+ *
+ * @param aggStateStore - The mechanism for storing aggregation state. (not null)
+ * @param aggregations - The aggregation functions that will be computed. (not null)
+ * @param groupByVars - The names of the binding whose values are used to group aggregation results. (not null)
+ */
+ public AggregationsEvaluator(
+ final AggregationStateStore aggStateStore,
+ final Collection<AggregationElement> aggregations,
+ final List<String> groupByVars) {
+ this.aggStateStore = requireNonNull(aggStateStore);
+ this.aggregations = requireNonNull(aggregations);
+ this.groupByVars = requireNonNull(groupByVars);
+ }
+
+ /**
+ * Make an instance of {@link AggregationsEvaluator} based on a {@link Group} node.
+ *
+ * @param aggStateStore - The mechanism for storing aggregation state. (not null)
+ * @param aggNode - Defines which aggregation functions need to be performed.
+ * @param groupByVars - The names of the binding whose values are used to group aggregation results. (not null)
+ * @return The evaluator that handles the node's aggregations.
+ */
+ public static AggregationsEvaluator make(final AggregationStateStore aggStateStore, final Group aggNode, final List<String> groupByVars) {
+ requireNonNull(aggStateStore);
+ requireNonNull(aggNode);
+ requireNonNull(groupByVars);
+
+ // The aggregations that need to be performed are the Group Elements.
+ final List<AggregationElement> aggregations = new ArrayList<>();
+ for(final GroupElem groupElem : aggNode.getGroupElements()) {
+ // Figure out the type of the aggregation.
+ final AggregateOperator operator = groupElem.getOperator();
+ final Optional<AggregationType> type = AggregationType.byOperatorClass( operator.getClass() );
+
+ // If the type is one we support, create the AggregationElement.
+ if(type.isPresent()) {
+ final String resultBindingName = groupElem.getName();
+
+ final AtomicReference<String> aggregatedBindingName = new AtomicReference<>();
+ groupElem.visitChildren(new QueryModelVisitorBase<RuntimeException>() {
+ @Override
+ public void meet(final Var node) {
+ aggregatedBindingName.set( node.getName() );
+ }
+ });
+
+ aggregations.add( new AggregationElement(type.get(), aggregatedBindingName.get(), resultBindingName) );
+ }
+ }
+
+ return new AggregationsEvaluator(aggStateStore, aggregations, groupByVars);
+ }
+
+ /**
+ * Update the aggregation values using information found within {@code newBs}.
+ *
+ * @param newBs - A binding set whose values need to be incorporated within the aggregations. (not null)
+ * @return A binding set containing the updated aggregation values.
+ */
+ public VisibilityBindingSet update(final VisibilityBindingSet newBs) {
+ requireNonNull(newBs);
+
+ // Load the old state if one was previously stored; otherwise initialize the state.
+ final AggregationState state = aggStateStore.get(newBs).orElseGet(() -> {
+ // Initialize a new state.
+ final AggregationState newState = new AggregationState();
+
+ // If we have group by bindings, their values need to be added to the state's binding set.
+ final MapBindingSet bindingSet = newState.getBindingSet();
+ for(final String groupByVar : groupByVars) {
+ bindingSet.addBinding( newBs.getBinding(groupByVar) );
+ }
+
+ return newState;
+ });
+
+ // Update the visibilities of the result binding set based on the new result's visibilities.
+ final String oldVisibility = state.getVisibility();
+ final String updateVisibilities = VisibilitySimplifier.unionAndSimplify(oldVisibility, newBs.getVisibility());
+ state.setVisibility(updateVisibilities);
+
+ // Update the Aggregation State with each Aggregation function included within this group.
+ for(final AggregationElement aggregation : aggregations) {
+ final AggregationType type = aggregation.getAggregationType();
+ final AggregationFunction function = FUNCTIONS.get(type);
+ if(function == null) {
+ throw new RuntimeException("Unrecognized aggregation function: " + type);
+ }
+
+ function.update(aggregation, state, newBs);
+ }
+
+ // Store the updated state. This will write on top of any old state that was present for the Group By values.
+ aggStateStore.store(state);
+
+ // Return the updated binding set from the updated state.
+ return new VisibilityBindingSet(state.getBindingSet(), state.getVisibility());
+ }
+}
\ No newline at end of file
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorSupplier.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorSupplier.java
new file mode 100644
index 0000000..c8e1049
--- /dev/null
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorSupplier.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.processors.aggregation;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.rya.api.function.aggregation.AggregationState;
+import org.apache.rya.api.function.aggregation.AggregationStateStore;
+import org.apache.rya.api.function.aggregation.AggregationsEvaluator;
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.streams.kafka.processors.ProcessorResult;
+import org.apache.rya.streams.kafka.processors.ProcessorResult.ResultType;
+import org.apache.rya.streams.kafka.processors.ProcessorResultFactory;
+import org.apache.rya.streams.kafka.processors.RyaStreamsProcessor;
+import org.apache.rya.streams.kafka.processors.RyaStreamsProcessorSupplier;
+import org.openrdf.query.algebra.Group;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.internal.Lists;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Supplies {@link AggregationProcessor} instances.
+ */
+@DefaultAnnotation(NonNull.class)
+public class AggregationProcessorSupplier extends RyaStreamsProcessorSupplier {
+
+ private final String stateStoreName;
+ private final Group aggNode;
+
+ /**
+ * Constructs an instance of {@link AggregationProcessorSupplier}.
+ *
+ * @param stateStoreName - The name of the state store the processor will use. (not null)
+ * @param aggNode - Defines which aggregations will be performed by the processor. (not null)
+ * @param resultFactory - The factory that the supplied processors will use to create results. (not null)
+ */
+ public AggregationProcessorSupplier(
+ final String stateStoreName,
+ final Group aggNode,
+ final ProcessorResultFactory resultFactory) {
+ super(resultFactory);
+ this.stateStoreName = requireNonNull(stateStoreName);
+ this.aggNode = requireNonNull(aggNode);
+ }
+
+ @Override
+ public Processor<Object, ProcessorResult> get() {
+ return new AggregationProcessor(stateStoreName, aggNode, super.getResultFactory());
+ }
+
+ /**
+ * Evaluates a {@link Group} node that contains a bunch of aggregations. Each aggregation will have a binding
+ * within the resulting binding sets that contains the aggregation value.
+ *
+ * @see AggregationsEvaluator
+ */
+ @DefaultAnnotation(NonNull.class)
+ public static class AggregationProcessor extends RyaStreamsProcessor {
+ private static final Logger log = LoggerFactory.getLogger(AggregationProcessor.class);
+
+ private final String stateStoreName;
+ private final Group aggNode;
+
+ private ProcessorContext context;
+ private AggregationStateStore aggStateStore;
+ private AggregationsEvaluator evaluator;
+
+ /**
+ * Constructs an instance of {@link AggregationProcessor}.
+ *
+ * @param stateStoreName - The name of the Kafka Streams state store that this processor will use. (not null)
+ * @param aggNode - The group by node that configures how the aggregations will be performed. (not null)
+ * @param resultFactory - The factory that will format this processor's final results for the downstream
+ * processor. (not null)
+ */
+ public AggregationProcessor(
+ final String stateStoreName,
+ final Group aggNode,
+ final ProcessorResultFactory resultFactory) {
+ super(resultFactory);
+ this.stateStoreName = requireNonNull(stateStoreName);
+ this.aggNode = requireNonNull(aggNode);
+ }
+
+ @Override
+ public void init(final ProcessorContext context) {
+ this.context = context;
+
+ // Sort the group by vars so that they will be written to the state store in the same order every time.
+ final List<String> groupByVars = Lists.newArrayList(aggNode.getGroupBindingNames());
+ groupByVars.sort(Comparator.naturalOrder());
+
+ // Get a reference to the state store that keeps track of aggregation state.
+ final KeyValueStore<String, AggregationState> stateStore =
+ (KeyValueStore<String, AggregationState>) context.getStateStore( stateStoreName );
+ aggStateStore = new KeyValueAggregationStateStore(stateStore, groupByVars);
+
+ // Create the aggregation evaluator.
+ evaluator = AggregationsEvaluator.make(aggStateStore, aggNode, groupByVars);
+ }
+
+ @Override
+ public void process(final Object key, final ProcessorResult value) {
+ // Aggregations can only be unary.
+ if (value.getType() != ResultType.UNARY) {
+ throw new RuntimeException("The ProcessorResult to be processed must be Unary.");
+ }
+
+ // Log the binding set that has been input.
+ log.debug("\nINPUT:\nBinding Set: {}", value.getUnary().getResult());
+
+ // Update the aggregations values.
+ final VisibilityBindingSet resultBs = evaluator.update(value.getUnary().getResult());
+
+ // Log the binding set that will be output.
+ log.debug("\nOUTPUT:\nBinding Set: {}", resultBs);
+
+ // Forward to the updated aggregation binding set to the downstream processors.
+ context.forward(key, super.getResultFactory().make(resultBs));
+ }
+
+ @Override
+ public void punctuate(final long timestamp) {
+ // Do nothing.
+ }
+
+ @Override
+ public void close() {
+ // Do nothing.
+ }
+ }
+}
\ No newline at end of file
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/aggregation/KeyValueAggregationStateStore.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/aggregation/KeyValueAggregationStateStore.java
new file mode 100644
index 0000000..3300590
--- /dev/null
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/aggregation/KeyValueAggregationStateStore.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.processors.aggregation;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.rya.api.function.aggregation.AggregationState;
+import org.apache.rya.api.function.aggregation.AggregationStateStore;
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.openrdf.query.BindingSet;
+
+import com.google.common.base.Joiner;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A {@link KeyValueStore} implementation of {@link AggregationStateStore}.
+ * </p>
+ * This is a key/value store, so we need to store the {@link AggregationState} for each set of group by values
+ * using a key that is composed with those values. We use the following pattern to accomplish this:
+ * <pre>
+ * [groupByVar1 value],[groupByVar2 value],...,[groupByVarN value]
+ * </pre>
+ */
+@DefaultAnnotation(NonNull.class)
+public class KeyValueAggregationStateStore implements AggregationStateStore {
+
+ private final KeyValueStore<String, AggregationState> store;
+ private final List<String> groupByVars;
+
+ /**
+ * Constructs an instance of {@link KeyValueAggregationStateStore}.
+ *
+ * @param store - The state store that will be used. (not null)
+ * @param groupByVars - An ordered list of group by variable names. (not null)
+ */
+ public KeyValueAggregationStateStore(
+ final KeyValueStore<String, AggregationState> store,
+ final List<String> groupByVars) {
+ this.store = requireNonNull(store);
+ this.groupByVars = requireNonNull(groupByVars);
+ }
+
+ @Override
+ public void store(final AggregationState state) {
+ requireNonNull(state);
+
+ // Aggregations group their states by their group by variables, so the key is the resulting binding
+ // set's values for the group by variables.
+ final String key = makeCommaDelimitedValues(groupByVars, state.getBindingSet());
+ store.put(key, state);
+ }
+
+ @Override
+ public Optional<AggregationState> get(final VisibilityBindingSet bs) {
+ requireNonNull(bs);
+
+ final String key = makeCommaDelimitedValues(groupByVars, bs);
+ return Optional.ofNullable(store.get(key));
+ }
+
+ /**
+ * A utility function that helps construct the keys used by {@link KeyValueAggregationStateStore}.
+ *
+ * @param vars - Which variables within the binding set to use for the key's values. (not null)
+ * @param bindingSet - The binding set the key is being constructed from. (not null)
+ * @return A comma delimited list of the binding values, leading with the side.
+ */
+ private static String makeCommaDelimitedValues(final List<String> vars, final BindingSet bindingSet) {
+ requireNonNull(vars);
+ requireNonNull(bindingSet);
+
+ // Make a an ordered list of the binding set variables.
+ final List<String> values = new ArrayList<>();
+ for(final String var : vars) {
+ values.add( bindingSet.hasBinding(var) ? bindingSet.getBinding(var).getValue().toString() : "" );
+ }
+
+ // Return a comma delimited list of those values.
+ return Joiner.on(",").join(values);
+ }
+}
\ No newline at end of file
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyFactory.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyFactory.java
index 426b041..4046e23 100644
--- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyFactory.java
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyFactory.java
@@ -51,6 +51,7 @@
import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult.Side;
import org.apache.rya.streams.kafka.processors.ProcessorResult.UnaryResult;
import org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier;
+import org.apache.rya.streams.kafka.processors.aggregation.AggregationProcessorSupplier;
import org.apache.rya.streams.kafka.processors.filter.FilterProcessorSupplier;
import org.apache.rya.streams.kafka.processors.join.JoinProcessorSupplier;
import org.apache.rya.streams.kafka.processors.output.BindingSetOutputFormatterSupplier;
@@ -65,6 +66,7 @@
import org.openrdf.query.algebra.BinaryTupleOperator;
import org.openrdf.query.algebra.Extension;
import org.openrdf.query.algebra.Filter;
+import org.openrdf.query.algebra.Group;
import org.openrdf.query.algebra.Join;
import org.openrdf.query.algebra.LeftJoin;
import org.openrdf.query.algebra.MultiProjection;
@@ -94,6 +96,7 @@
private static final String JOIN_PREFIX = "JOIN_";
private static final String PROJECTION_PREFIX = "PROJECTION_";
private static final String FILTER_PREFIX = "FILTER_";
+ private static final String AGGREGATION_PREFIX = "AGGREGATION_";
private static final String SINK = "SINK";
private List<ProcessorEntry> processorEntryList;
@@ -141,14 +144,15 @@
builder.addProcessor(entry.getID(), entry.getSupplier(), parentIDs);
}
- if (entry.getNode() instanceof Join || entry.getNode() instanceof LeftJoin) {
+ // Add a state store for any node type that requires one.
+ if (entry.getNode() instanceof Join || entry.getNode() instanceof LeftJoin || entry.getNode() instanceof Group) {
// Add a state store for the join processor.
final StateStoreSupplier joinStoreSupplier =
Stores.create( entry.getID() )
- .withStringKeys()
- .withValues(new VisibilityBindingSetSerde())
- .persistent()
- .build();
+ .withStringKeys()
+ .withValues(new VisibilityBindingSetSerde())
+ .persistent()
+ .build();
builder.addStateStore(joinStoreSupplier, entry.getID());
}
}
@@ -459,6 +463,16 @@
super.meet(node);
}
+ @Override
+ public void meet(final Group node) throws TopologyBuilderException {
+ final String id = AGGREGATION_PREFIX + UUID.randomUUID();
+ final Optional<Side> side = getSide(node);
+ final AggregationProcessorSupplier supplier = new AggregationProcessorSupplier(id, node, (result) -> getResult(side, result));
+ entries.add( new ProcessorEntry(node, id, side, supplier, Lists.newArrayList(node.getArg())) );
+ idMap.put(node, id);
+ super.meet(node);
+ }
+
/**
* Gets the {@link Side} the current node in the visitor is on relative to the provided node.
* @param node - The node used to determine the side of the current visitor node.
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorIT.java
new file mode 100644
index 0000000..ccf5c0c
--- /dev/null
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorIT.java
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.processors.aggregation;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.rya.api.function.projection.RandomUUIDFactory;
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.api.model.VisibilityStatement;
+import org.apache.rya.streams.kafka.KafkaTopics;
+import org.apache.rya.streams.kafka.RyaStreamsTestUtil;
+import org.apache.rya.streams.kafka.processors.aggregation.AggregationProcessorSupplier.AggregationProcessor;
+import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
+import org.apache.rya.streams.kafka.topology.TopologyFactory;
+import org.apache.rya.test.kafka.KafkaTestInstanceRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.model.vocabulary.XMLSchema;
+import org.openrdf.query.impl.MapBindingSet;
+
+/**
+ * Integration tests {@link AggregationProcessor}.
+ */
+public class AggregationProcessorIT {
+
+ @Rule
+ public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(false);
+
+ @Test
+ public void count() throws Exception {
+ // A query that figures out how many books each person has.
+ final String sparql =
+ "SELECT ?person (count(?book) as ?bookCount) " +
+ "WHERE { " +
+ "?person <urn:hasBook> ?book " +
+ "} GROUP BY ?person";
+
+ // Create the statements that will be input into the query..
+ final ValueFactory vf = new ValueFactoryImpl();
+ final List<VisibilityStatement> statements = new ArrayList<>();
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:hasBook"), vf.createLiteral("Book 1")), "a"));
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:hasBook"), vf.createLiteral("Book 1")), ""));
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:hasBook"), vf.createLiteral("Book 2")), "b"));
+
+ // Make the expected results.
+ final Set<VisibilityBindingSet> expected = new HashSet<>();
+ MapBindingSet bs = new MapBindingSet();
+ bs.addBinding("person", vf.createURI("urn:Alice"));
+ bs.addBinding("bookCount", vf.createLiteral("1", XMLSchema.INTEGER));
+ expected.add(new VisibilityBindingSet(bs, "a"));
+
+ bs = new MapBindingSet();
+ bs.addBinding("person", vf.createURI("urn:Bob"));
+ bs.addBinding("bookCount", vf.createLiteral("1", XMLSchema.INTEGER));
+ expected.add(new VisibilityBindingSet(bs, ""));
+
+ bs = new MapBindingSet();
+ bs.addBinding("person", vf.createURI("urn:Alice"));
+ bs.addBinding("bookCount", vf.createLiteral("2", XMLSchema.INTEGER));
+ expected.add(new VisibilityBindingSet(bs, "a&b"));
+
+ // Enumerate some topics that will be re-used
+ final String ryaInstance = UUID.randomUUID().toString();
+ final UUID queryId = UUID.randomUUID();
+ final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+ // Setup a topology.
+ final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
+
+ // Run the test.
+ RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
+ }
+
+ @Test
+ public void sum() throws Exception {
+ // A query that figures out how much food each person has.
+ final String sparql =
+ "SELECT ?person (sum(?foodCount) as ?totalFood) " +
+ "WHERE { " +
+ "?person <urn:hasFoodType> ?food . " +
+ "?food <urn:count> ?foodCount . " +
+ "} GROUP BY ?person";
+
+ // Create the statements that will be input into the query..
+ final ValueFactory vf = new ValueFactoryImpl();
+ final List<VisibilityStatement> statements = new ArrayList<>();
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:hasFoodType"), vf.createURI("urn:corn")), ""));
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:hasFoodType"), vf.createURI("urn:apple")), ""));
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:corn"), vf.createURI("urn:count"), vf.createLiteral(4)), ""));
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:apple"), vf.createURI("urn:count"), vf.createLiteral(3)), ""));
+
+ // Make the expected results.
+ final Set<VisibilityBindingSet> expected = new HashSet<>();
+ MapBindingSet bs = new MapBindingSet();
+ bs.addBinding("person", vf.createURI("urn:Alice"));
+ bs.addBinding("totalFood", vf.createLiteral("4", XMLSchema.INTEGER));
+ expected.add(new VisibilityBindingSet(bs, ""));
+
+ bs = new MapBindingSet();
+ bs.addBinding("person", vf.createURI("urn:Alice"));
+ bs.addBinding("totalFood", vf.createLiteral("7", XMLSchema.INTEGER));
+ expected.add(new VisibilityBindingSet(bs, ""));
+
+ // Enumerate some topics that will be re-used
+ final String ryaInstance = UUID.randomUUID().toString();
+ final UUID queryId = UUID.randomUUID();
+ final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+ // Setup a topology.
+ final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
+
+ // Run the test.
+ RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
+ }
+
+ @Test
+ public void average() throws Exception {
+ // A query that figures out the average age across all people.
+ final String sparql =
+ "SELECT (avg(?age) as ?avgAge) " +
+ "WHERE { " +
+ "?person <urn:age> ?age " +
+ "}";
+
+ // Create the statements that will be input into the query..
+ final ValueFactory vf = new ValueFactoryImpl();
+ final List<VisibilityStatement> statements = new ArrayList<>();
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:age"), vf.createLiteral(3)), ""));
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:age"), vf.createLiteral(7)), ""));
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:age"), vf.createLiteral(2)), ""));
+
+ // Make the expected results.
+ final Set<VisibilityBindingSet> expected = new HashSet<>();
+ MapBindingSet bs = new MapBindingSet();
+ bs.addBinding("avgAge", vf.createLiteral("3", XMLSchema.DECIMAL));
+ expected.add(new VisibilityBindingSet(bs, ""));
+
+ bs = new MapBindingSet();
+ bs.addBinding("avgAge", vf.createLiteral("5", XMLSchema.DECIMAL));
+ expected.add(new VisibilityBindingSet(bs, ""));
+
+ bs = new MapBindingSet();
+ bs.addBinding("avgAge", vf.createLiteral("4", XMLSchema.DECIMAL));
+ expected.add(new VisibilityBindingSet(bs, ""));
+
+ // Enumerate some topics that will be re-used
+ final String ryaInstance = UUID.randomUUID().toString();
+ final UUID queryId = UUID.randomUUID();
+ final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+ // Setup a topology.
+ final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
+
+ // Run the test.
+ RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
+ }
+
+ @Test
+ public void min() throws Exception {
+ // A query that figures out what the youngest age is across all people.
+ final String sparql =
+ "SELECT (min(?age) as ?youngest) " +
+ "WHERE { " +
+ "?person <urn:age> ?age " +
+ "}";
+
+ // Create the statements that will be input into the query..
+ final ValueFactory vf = new ValueFactoryImpl();
+ final List<VisibilityStatement> statements = new ArrayList<>();
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:age"), vf.createLiteral(13)), ""));
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:age"), vf.createLiteral(14)), ""));
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:age"), vf.createLiteral(7)), ""));
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:David"), vf.createURI("urn:age"), vf.createLiteral(5)), ""));
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Eve"), vf.createURI("urn:age"), vf.createLiteral(25)), ""));
+
+ // Make the expected results.
+ final Set<VisibilityBindingSet> expected = new HashSet<>();
+ MapBindingSet bs = new MapBindingSet();
+ bs.addBinding("youngest", vf.createLiteral(13));
+ expected.add(new VisibilityBindingSet(bs, ""));
+
+ bs = new MapBindingSet();
+ bs.addBinding("youngest", vf.createLiteral(7));
+ expected.add(new VisibilityBindingSet(bs, ""));
+
+ bs = new MapBindingSet();
+ bs.addBinding("youngest", vf.createLiteral(5));
+ expected.add(new VisibilityBindingSet(bs, ""));
+
+ // Enumerate some topics that will be re-used
+ final String ryaInstance = UUID.randomUUID().toString();
+ final UUID queryId = UUID.randomUUID();
+ final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+ // Setup a topology.
+ final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
+
+ // Run the test.
+ RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
+ }
+
+ @Test
+ public void max() throws Exception {
+ // A query that figures out what the oldest age is across all people.
+ final String sparql =
+ "SELECT (max(?age) as ?oldest) " +
+ "WHERE { " +
+ "?person <urn:age> ?age " +
+ "}";
+
+ // Create the statements that will be input into the query..
+ final ValueFactory vf = new ValueFactoryImpl();
+ final List<VisibilityStatement> statements = new ArrayList<>();
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:age"), vf.createLiteral(13)), ""));
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:age"), vf.createLiteral(14)), ""));
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:age"), vf.createLiteral(7)), ""));
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:David"), vf.createURI("urn:age"), vf.createLiteral(5)), ""));
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Eve"), vf.createURI("urn:age"), vf.createLiteral(25)), ""));
+
+ // Make the expected results.
+ final Set<VisibilityBindingSet> expected = new HashSet<>();
+ MapBindingSet bs = new MapBindingSet();
+ bs.addBinding("oldest", vf.createLiteral(13));
+ expected.add(new VisibilityBindingSet(bs, ""));
+
+ bs = new MapBindingSet();
+ bs.addBinding("oldest", vf.createLiteral(14));
+ expected.add(new VisibilityBindingSet(bs, ""));
+
+ bs = new MapBindingSet();
+ bs.addBinding("oldest", vf.createLiteral(25));
+ expected.add(new VisibilityBindingSet(bs, ""));
+
+ // Enumerate some topics that will be re-used
+ final String ryaInstance = UUID.randomUUID().toString();
+ final UUID queryId = UUID.randomUUID();
+ final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+ // Setup a topology.
+ final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
+
+ // Run the test.
+ RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
+ }
+
+ @Test
+ public void multipleGroupByVars() throws Exception {
+ // A query that contains more than one group by variable.
+ final String sparql =
+ "SELECT ?business ?employee (sum(?hours) AS ?totalHours) " +
+ "WHERE {" +
+ "?employee <urn:worksAt> ?business . " +
+ "?business <urn:hasTimecardId> ?timecardId . " +
+ "?employee <urn:hasTimecardId> ?timecardId . " +
+ "?timecardId <urn:hours> ?hours . " +
+ "} GROUP BY ?business ?employee";
+
+ // Create the statements that will be input into the query.
+ final ValueFactory vf = new ValueFactoryImpl();
+ final List<VisibilityStatement> statements = new ArrayList<>();
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:worksAt"), vf.createURI("urn:TacoJoint")), ""));
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:TacoJoint"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard1")), ""));
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard1")), ""));
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:timecard1"), vf.createURI("urn:hours"), vf.createLiteral(40)), ""));
+
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:TacoJoint"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard2")), ""));
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard2")), ""));
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:timecard2"), vf.createURI("urn:hours"), vf.createLiteral(25)), ""));
+
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:worksAt"), vf.createURI("urn:TacoJoint")), ""));
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:TacoJoint"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard3")), ""));
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard3")), ""));
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:timecard3"), vf.createURI("urn:hours"), vf.createLiteral(28)), ""));
+
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:TacoJoint"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard4")), ""));
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard4")), ""));
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:timecard4"), vf.createURI("urn:hours"), vf.createLiteral(28)), ""));
+
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:worksAt"), vf.createURI("urn:CoffeeShop")), ""));
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:CoffeeShop"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard5")), ""));
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard5")), ""));
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:timecard5"), vf.createURI("urn:hours"), vf.createLiteral(12)), ""));
+
+ // Make the expected results.
+ final Set<VisibilityBindingSet> expected = new HashSet<>();
+ MapBindingSet bs = new MapBindingSet();
+ bs.addBinding("business", vf.createURI("urn:TacoJoint"));
+ bs.addBinding("employee", vf.createURI("urn:Alice"));
+ bs.addBinding("totalHours", vf.createLiteral("40", XMLSchema.INTEGER));
+ expected.add(new VisibilityBindingSet(bs, ""));
+
+ bs = new MapBindingSet();
+ bs.addBinding("business", vf.createURI("urn:TacoJoint"));
+ bs.addBinding("employee", vf.createURI("urn:Alice"));
+ bs.addBinding("totalHours", vf.createLiteral("65", XMLSchema.INTEGER));
+ expected.add(new VisibilityBindingSet(bs, ""));
+
+ bs = new MapBindingSet();
+ bs.addBinding("business", vf.createURI("urn:TacoJoint"));
+ bs.addBinding("employee", vf.createURI("urn:Bob"));
+ bs.addBinding("totalHours", vf.createLiteral("28", XMLSchema.INTEGER));
+ expected.add(new VisibilityBindingSet(bs, ""));
+
+ bs = new MapBindingSet();
+ bs.addBinding("business", vf.createURI("urn:TacoJoint"));
+ bs.addBinding("employee", vf.createURI("urn:Bob"));
+ bs.addBinding("totalHours", vf.createLiteral("56", XMLSchema.INTEGER));
+ expected.add(new VisibilityBindingSet(bs, ""));
+
+ bs = new MapBindingSet();
+ bs.addBinding("business", vf.createURI("urn:CoffeeShop"));
+ bs.addBinding("employee", vf.createURI("urn:Alice"));
+ bs.addBinding("totalHours", vf.createLiteral("12", XMLSchema.INTEGER));
+ expected.add(new VisibilityBindingSet(bs, ""));
+
+ // Enumerate some topics that will be re-used
+ final String ryaInstance = UUID.randomUUID().toString();
+ final UUID queryId = UUID.randomUUID();
+ final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+ // Setup a topology.
+ final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
+
+ // Run the test.
+ RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 4000, statements, expected, VisibilityBindingSetDeserializer.class);
+ }
+
+ @Test
+ public void multipleAggregations() throws Exception {
+ // A query that figures out what the youngest and oldest ages are across all people.
+ final String sparql =
+ "SELECT (min(?age) as ?youngest) (max(?age) as ?oldest) " +
+ "WHERE { " +
+ "?person <urn:age> ?age " +
+ "}";
+
+ // Create the statements that will be input into the query..
+ final ValueFactory vf = new ValueFactoryImpl();
+ final List<VisibilityStatement> statements = new ArrayList<>();
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:age"), vf.createLiteral(13)), ""));
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:age"), vf.createLiteral(14)), ""));
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:age"), vf.createLiteral(7)), ""));
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:David"), vf.createURI("urn:age"), vf.createLiteral(5)), ""));
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Eve"), vf.createURI("urn:age"), vf.createLiteral(25)), ""));
+
+ // Make the expected results.
+ final Set<VisibilityBindingSet> expected = new HashSet<>();
+ MapBindingSet bs = new MapBindingSet();
+ bs.addBinding("youngest", vf.createLiteral(13));
+ bs.addBinding("oldest", vf.createLiteral(13));
+ expected.add(new VisibilityBindingSet(bs, ""));
+
+ bs = new MapBindingSet();
+ bs.addBinding("youngest", vf.createLiteral(13));
+ bs.addBinding("oldest", vf.createLiteral(14));
+ expected.add(new VisibilityBindingSet(bs, ""));
+
+ bs = new MapBindingSet();
+ bs.addBinding("youngest", vf.createLiteral(7));
+ bs.addBinding("oldest", vf.createLiteral(14));
+ expected.add(new VisibilityBindingSet(bs, ""));
+
+ bs = new MapBindingSet();
+ bs.addBinding("youngest", vf.createLiteral(5));
+ bs.addBinding("oldest", vf.createLiteral(14));
+ expected.add(new VisibilityBindingSet(bs, ""));
+
+ bs = new MapBindingSet();
+ bs.addBinding("youngest", vf.createLiteral(5));
+ bs.addBinding("oldest", vf.createLiteral(25));
+ expected.add(new VisibilityBindingSet(bs, ""));
+
+ // Enumerate some topics that will be re-used
+ final String ryaInstance = UUID.randomUUID().toString();
+ final UUID queryId = UUID.randomUUID();
+ final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+ // Setup a topology.
+ final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
+
+ // Run the test.
+ RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
+ }
+}
\ No newline at end of file