RYA-462 Updated the Kafka topic name for StreamsQueries to include the Rya Instance name.

Conflicts:
	extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/GetQueryResultStream.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/GetQueryResultStream.java
index 951d060..0e70391 100644
--- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/GetQueryResultStream.java
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/GetQueryResultStream.java
@@ -37,18 +37,20 @@
     /**
      * Stream all of the results that have been produced by a query.
      *
+     * @param ryaInstance - The name of the Rya Instance the query is for. (not null)
      * @param queryId - Indicates which query results to stream. (not null)
      * @return A {@link QueryResultStream} that starts with the first result that was ever produced.
      * @throws RyaStreamsException Could not create the result stream.
      */
-    public QueryResultStream<T> fromStart(UUID queryId) throws RyaStreamsException;
+    public QueryResultStream<T> fromStart(String ryaInstance, UUID queryId) throws RyaStreamsException;
 
     /**
      * Stream results that have been produced by a query after this method was invoked.
      *
+     * @param ryaInstance - The name of the Rya Instance the query is for. (not null)
      * @param queryId - Indicates which query results to stream. (not null)
      * @return A {@link QueryResultStream} that only returns results that were produced after this method is invoked.
      * @throws RyaStreamsException Could not create the result stream.
      */
-    public QueryResultStream<T> fromNow(UUID queryId) throws RyaStreamsException;
+    public QueryResultStream<T> fromNow(String ryaInstance, UUID queryId) throws RyaStreamsException;
 }
\ No newline at end of file
diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java
index 7b311f6..f9a9458 100644
--- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java
+++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java
@@ -135,7 +135,7 @@
                 // Make sure the topics required by the application exists for the specified Rya instances.
                 final Set<String> topics = new HashSet<>();
                 topics.add( KafkaTopics.statementsTopic(params.ryaInstance) );
-                topics.add( KafkaTopics.queryResultsTopic(queryId) );
+                topics.add( KafkaTopics.queryResultsTopic(params.ryaInstance, queryId) );
                 KafkaTopics.createTopics(params.zookeeperServers, topics, 1, 1);
 
                 // Run the query that uses those topics.
@@ -143,7 +143,7 @@
                         params.kafkaIP,
                         params.kafkaPort,
                         KafkaTopics.statementsTopic(params.ryaInstance),
-                        KafkaTopics.queryResultsTopic(queryId),
+                        KafkaTopics.queryResultsTopic(params.ryaInstance, queryId),
                         queryRepo,
                         new TopologyFactory());
                 runQuery.run(queryId);
diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java
index 8ae4e08..783aedc 100644
--- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java
+++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,18 +20,11 @@
 
 import static java.util.Objects.requireNonNull;
 
-import java.io.OutputStream;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
 import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.rya.api.model.VisibilityBindingSet;
-import org.apache.rya.api.model.VisibilityStatement;
-import org.apache.rya.api.utils.QueryInvestigator;
 import org.apache.rya.streams.api.entity.QueryResultStream;
 import org.apache.rya.streams.api.entity.StreamsQuery;
 import org.apache.rya.streams.api.interactor.GetQueryResultStream;
@@ -39,13 +32,13 @@
 import org.apache.rya.streams.api.queries.QueryChangeLog;
 import org.apache.rya.streams.api.queries.QueryRepository;
 import org.apache.rya.streams.client.RyaStreamsCommand;
-import org.apache.rya.streams.client.util.QueryResultsOutputUtil;
 import org.apache.rya.streams.kafka.KafkaTopics;
 import org.apache.rya.streams.kafka.interactor.KafkaGetQueryResultStream;
 import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLogFactory;
 import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
 import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer;
 import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.algebra.Reduced;
 import org.openrdf.query.algebra.TupleExpr;
 import org.openrdf.query.parser.sparql.SPARQLParser;
 
@@ -72,20 +65,14 @@
         @Parameter(names = {"--queryId", "-q"}, required = true, description = "The query whose results will be streamed to the console.")
         private String queryId;
 
-        @Parameter(names = {"--file", "-f"}, required = false, description = "If provided, the output file the results will stream into.")
-        private String outputPath;
-
         @Override
         public String toString() {
             final StringBuilder parameters = new StringBuilder();
             parameters.append(super.toString());
 
             if (!Strings.isNullOrEmpty(queryId)) {
-                parameters.append("\tQuery ID: " + queryId + "\n");
-            }
-
-            if(!Strings.isNullOrEmpty(outputPath)) {
-                parameters.append("\tOutput Path: " + outputPath + "\n");
+                parameters.append("\tQuery ID: " + queryId);
+                parameters.append("\n");
             }
 
             return parameters.toString();
@@ -173,12 +160,10 @@
         });
 
         // Build the interactor based on the type of result the query produces.
-        boolean isStatementResults = false;
-
         final GetQueryResultStream<?> getQueryResultStream;
         try {
-            isStatementResults = QueryInvestigator.isConstruct(sparql) | QueryInvestigator.isInsertWhere(sparql);
-            if(isStatementResults) {
+            final TupleExpr tupleExpr = new SPARQLParser().parseQuery(sparql, null).getTupleExpr();
+            if(tupleExpr instanceof Reduced) {
                 getQueryResultStream = new KafkaGetQueryResultStream<>(params.kafkaIP, params.kafkaPort, VisibilityStatementDeserializer.class);
             } else {
                 getQueryResultStream = new KafkaGetQueryResultStream<>(params.kafkaIP, params.kafkaPort, VisibilityBindingSetDeserializer.class);
@@ -187,22 +172,12 @@
             throw new ExecutionException("Could not parse the SPARQL for the query: " + sparql, e);
         }
 
-        // Iterate through the results and print them to the configured output mechanism.
-        try (final QueryResultStream<?> resultsStream = getQueryResultStream.fromStart(queryId)) {
-            final TupleExpr tupleExpr = new SPARQLParser().parseQuery(sparql, null).getTupleExpr();
-            if(params.outputPath != null) {
-                final Path file = Paths.get(params.outputPath);
-                try (final OutputStream out = Files.newOutputStream(file)) {
-                    if(isStatementResults) {
-                        final QueryResultStream<VisibilityStatement> stmtStream = (QueryResultStream<VisibilityStatement>) resultsStream;
-                        QueryResultsOutputUtil.toNtriplesFile(out, stmtStream, finished);
-                    } else {
-                        final QueryResultStream<VisibilityBindingSet> bsStream = (QueryResultStream<VisibilityBindingSet>) resultsStream;
-                        QueryResultsOutputUtil.toBindingSetJSONFile(out, tupleExpr, bsStream, finished);
-                    }
+        // Iterate through the results and print them to the console until the program or the stream ends.
+        try (final QueryResultStream<?> stream = getQueryResultStream.fromStart(params.ryaInstance, queryId)) {
+            while(!finished.get()) {
+                for(final Object result : stream.poll(1000)) {
+                    System.out.println(result);
                 }
-            } else {
-                streamToSystemOut(resultsStream, finished);
             }
         } catch (final Exception e) {
             System.err.println("Error while reading the results from the stream.");
@@ -210,15 +185,4 @@
             System.exit(1);
         }
     }
-
-    private static void streamToSystemOut(final QueryResultStream<?> stream, final AtomicBoolean shutdownSignal) throws Exception {
-        requireNonNull(stream);
-        requireNonNull(shutdownSignal);
-
-        while(!shutdownSignal.get()) {
-            for(final Object result : stream.poll(1000)) {
-                System.out.println(result);
-            }
-        }
-    }
-}
\ No newline at end of file
+}
diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java
index 5d63f32..176b920 100644
--- a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java
+++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java
@@ -184,7 +184,7 @@
             loadStatements.fromCollection(statements);
 
             // Read the output of the streams program.
-            final String resultsTopic = KafkaTopics.queryResultsTopic(sQuery.getQueryId());
+            final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, sQuery.getQueryId());
             resultConsumer.subscribe( Lists.newArrayList(resultsTopic) );
             results = KafkaTestUtil.pollForResults(500, 6, 3, resultConsumer);
         } finally {
diff --git a/extras/rya.streams/geo/src/test/java/org/apache/rya/streams/kafka/processors/filter/GeoFilterIT.java b/extras/rya.streams/geo/src/test/java/org/apache/rya/streams/kafka/processors/filter/GeoFilterIT.java
index c090afa..17a290a 100644
--- a/extras/rya.streams/geo/src/test/java/org/apache/rya/streams/kafka/processors/filter/GeoFilterIT.java
+++ b/extras/rya.streams/geo/src/test/java/org/apache/rya/streams/kafka/processors/filter/GeoFilterIT.java
@@ -88,7 +88,7 @@
         final String ryaInstance = UUID.randomUUID().toString();
         final UUID queryId = UUID.randomUUID();
         final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId);
 
         // Get the RDF model objects that will be used to build the query.
         final String sparql =
diff --git a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorIT.java b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorIT.java
index a4e4a3e..2a1e760 100644
--- a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorIT.java
+++ b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorIT.java
@@ -34,7 +34,6 @@
 import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
 import org.apache.rya.streams.kafka.topology.TopologyFactory;
 import org.apache.rya.test.kafka.KafkaTestInstanceRule;
-import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.openrdf.model.ValueFactory;
@@ -90,7 +89,7 @@
         final String ryaInstance = UUID.randomUUID().toString();
         final UUID queryId = UUID.randomUUID();
         final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId);
 
         // Setup a topology.
         final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
@@ -137,7 +136,7 @@
         final String ryaInstance = UUID.randomUUID().toString();
         final UUID queryId = UUID.randomUUID();
         final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId);
 
         // Setup a topology.
         final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
@@ -183,7 +182,7 @@
         final String ryaInstance = UUID.randomUUID().toString();
         final UUID queryId = UUID.randomUUID();
         final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId);
 
         // Setup a topology.
         final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
@@ -233,7 +232,7 @@
         final String ryaInstance = UUID.randomUUID().toString();
         final UUID queryId = UUID.randomUUID();
         final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId);
 
         // Setup a topology.
         final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
@@ -283,7 +282,7 @@
         final String ryaInstance = UUID.randomUUID().toString();
         final UUID queryId = UUID.randomUUID();
         final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId);
 
         // Setup a topology.
         final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
@@ -371,7 +370,7 @@
         final String ryaInstance = UUID.randomUUID().toString();
         final UUID queryId = UUID.randomUUID();
         final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId);
 
         // Setup a topology.
         final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
@@ -380,9 +379,7 @@
         RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
     }
 
-    // Ignored because this test is kind of flakey.
     @Test
-    @Ignore
     public void multipleAggregations() throws Exception {
         // A query that figures out what the youngest and oldest ages are across all people.
         final String sparql =
@@ -436,7 +433,7 @@
         final String ryaInstance = UUID.randomUUID().toString();
         final UUID queryId = UUID.randomUUID();
         final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId);
 
         // Setup a topology.
         final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
diff --git a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java
index aaa67ea..7fb228a 100644
--- a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java
+++ b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java
@@ -54,7 +54,7 @@
         final String ryaInstance = UUID.randomUUID().toString();
         final UUID queryId = UUID.randomUUID();
         final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId);
 
         // Get the RDF model objects that will be used to build the query.
         final String sparql =
diff --git a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/TemporalFilterIT.java b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/TemporalFilterIT.java
index 6e27669..11637b7 100644
--- a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/TemporalFilterIT.java
+++ b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/TemporalFilterIT.java
@@ -85,7 +85,7 @@
         final String ryaInstance = UUID.randomUUID().toString();
         final UUID queryId = UUID.randomUUID();
         final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId);
 
         // Get the RDF model objects that will be used to build the query.
         final String sparql =
@@ -119,7 +119,7 @@
         final String ryaInstance = UUID.randomUUID().toString();
         final UUID queryId = UUID.randomUUID();
         final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId);
 
         // Get the RDF model objects that will be used to build the query.
         final String sparql =
@@ -153,7 +153,7 @@
         final String ryaInstance = UUID.randomUUID().toString();
         final UUID queryId = UUID.randomUUID();
         final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId);
 
         // Get the RDF model objects that will be used to build the query.
         final String sparql =
@@ -187,7 +187,7 @@
         final String ryaInstance = UUID.randomUUID().toString();
         final UUID queryId = UUID.randomUUID();
         final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId);
 
         // Get the RDF model objects that will be used to build the query.
         final String sparql =
diff --git a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java
index bdb9be6..5f09372 100644
--- a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java
+++ b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java
@@ -69,7 +69,7 @@
         final String ryaInstance = UUID.randomUUID().toString();
         final UUID queryId = UUID.randomUUID();
         final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId);
 
         // Setup a topology.
         final String query =
@@ -120,7 +120,7 @@
         final String ryaInstance = UUID.randomUUID().toString();
         final UUID queryId = UUID.randomUUID();
         final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId);
 
         // Setup a topology.
         final String query =
@@ -171,7 +171,7 @@
         final String ryaInstance = UUID.randomUUID().toString();
         final UUID queryId = UUID.randomUUID();
         final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId);
 
         // Setup a topology.
         final String query =
@@ -228,7 +228,7 @@
         final String ryaInstance = UUID.randomUUID().toString();
         final UUID queryId = UUID.randomUUID();
         final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId);
 
         // Setup a topology.
         final String query =
@@ -269,7 +269,7 @@
         final String ryaInstance = UUID.randomUUID().toString();
         final UUID queryId = UUID.randomUUID();
         final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId);
 
         // Setup a topology.
         final String query =
diff --git a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java
index a560294..c6fd1cf 100644
--- a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java
+++ b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java
@@ -28,7 +28,6 @@
 import org.apache.rya.api.model.VisibilityStatement;
 import org.apache.rya.streams.kafka.KafkaTopics;
 import org.apache.rya.streams.kafka.RyaStreamsTestUtil;
-import org.apache.rya.streams.kafka.processors.projection.MultiProjectionProcessorSupplier;
 import org.apache.rya.streams.kafka.processors.projection.MultiProjectionProcessorSupplier.MultiProjectionProcessor;
 import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer;
 import org.apache.rya.streams.kafka.topology.TopologyFactory;
@@ -54,7 +53,7 @@
         final String ryaInstance = UUID.randomUUID().toString();
         final UUID queryId = UUID.randomUUID();
         final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId);
 
         // Create a topology for the Query that will be tested.
         final String sparql =
diff --git a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java
index 322bba9..f53f2c4 100644
--- a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java
+++ b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java
@@ -30,7 +30,6 @@
 import org.apache.rya.api.model.VisibilityStatement;
 import org.apache.rya.streams.kafka.KafkaTopics;
 import org.apache.rya.streams.kafka.RyaStreamsTestUtil;
-import org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier;
 import org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier.StatementPatternProcessor;
 import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
 import org.apache.rya.streams.kafka.topology.TopologyFactory;
@@ -57,7 +56,7 @@
         final String ryaInstance = UUID.randomUUID().toString();
         final UUID queryId = UUID.randomUUID();
         final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId);
 
         // Create a topology for the Query that will be tested.
         final String sparql =
diff --git a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/sp/StatementPatternProcessorIT.java b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/sp/StatementPatternProcessorIT.java
index ba11e57..fd0a48d 100644
--- a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/sp/StatementPatternProcessorIT.java
+++ b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/sp/StatementPatternProcessorIT.java
@@ -30,7 +30,6 @@
 import org.apache.rya.api.model.VisibilityStatement;
 import org.apache.rya.streams.kafka.KafkaTopics;
 import org.apache.rya.streams.kafka.RyaStreamsTestUtil;
-import org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier;
 import org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier.StatementPatternProcessor;
 import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
 import org.apache.rya.streams.kafka.topology.TopologyFactory;
@@ -55,7 +54,7 @@
         final String ryaInstance = UUID.randomUUID().toString();
         final UUID queryId = UUID.randomUUID();
         final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId);
 
         // Setup a topology.
         final String query = "SELECT * WHERE { ?person <urn:talksTo> ?otherPerson }";
@@ -85,7 +84,7 @@
         final String ryaInstance = UUID.randomUUID().toString();
         final UUID queryId = UUID.randomUUID();
         final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId);
 
         // Setup a topology.
         final String query = "SELECT * WHERE { ?person <urn:talksTo> ?otherPerson }";
@@ -123,7 +122,7 @@
         final String ryaInstance = UUID.randomUUID().toString();
         final UUID queryId = UUID.randomUUID();
         final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId);
 
         // Setup a topology.
         final String query = "SELECT * WHERE { "
@@ -157,7 +156,7 @@
         final String ryaInstance = UUID.randomUUID().toString();
         final UUID queryId = UUID.randomUUID();
         final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId);
 
         // Setup a topology.
         final String query = "SELECT * WHERE { "
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java
index 989799a..f0cc842 100644
--- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java
@@ -62,7 +62,7 @@
      * @param changeLogTopic - The topic to evaluate. (not null)
      * @return If the topic is well formatted, then the Rya instance name that was part of the topic name.
      */
-    public static Optional<String> getRyaInstance(final String changeLogTopic) {
+    public static Optional<String> getRyaInstanceFromQueryChangeLog(final String changeLogTopic) {
         requireNonNull(changeLogTopic);
 
         // Return absent if the provided topic does not represent a query change log topic.
@@ -93,9 +93,31 @@
      * @param queryId - The id of the query the topic is for.
      * @return The name of the Kafka topic.
      */
-    public static String queryResultsTopic(final UUID queryId) {
+    public static String queryResultsTopic(final String ryaInstance, final UUID queryId) {
         requireNonNull(queryId);
-        return "QueryResults-" + queryId.toString();
+        return ryaInstance + "-QueryResults-" + queryId.toString();
+    }
+
+    /**
+     * TODO doc
+     *
+     * @param queryResultsTopic
+     * @return
+     */
+    public static String getRyaInstanceFromQueryResultsTopic(final String queryResultsTopic) {
+        // TODO
+        return "";
+    }
+
+    /**
+     * TODO doc
+     *
+     * @param queryResultsTopic
+     * @return
+     */
+    public static UUID getQueryIdFromQueryResultsTopic(final String queryResultsTopic) {
+        // TODO
+        return null;
     }
 
     /**
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/SingleThreadKafkaStreamsFactory.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/SingleThreadKafkaStreamsFactory.java
index 8093951..63d64b9 100644
--- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/SingleThreadKafkaStreamsFactory.java
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/SingleThreadKafkaStreamsFactory.java
@@ -78,7 +78,7 @@
 
         // Setup the topology that processes the Query.
         final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(query.getQueryId());
+        final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, query.getQueryId());
 
         try {
             final TopologyBuilder topologyBuilder = topologyFactory.build(query.getSparql(), statementsTopic, resultsTopic, new RandomUUIDFactory());
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStream.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStream.java
index 529b493..d91edeb 100644
--- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStream.java
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStream.java
@@ -66,22 +66,22 @@
     }
 
     @Override
-    public QueryResultStream<T> fromStart(final UUID queryId) throws RyaStreamsException {
+    public QueryResultStream<T> fromStart(final String ryaInstance, final UUID queryId) throws RyaStreamsException {
         requireNonNull(queryId);
 
         // Always start at the earliest point within the topic.
-        return makeStream(queryId, "earliest");
+        return makeStream(ryaInstance, queryId, "earliest");
     }
 
     @Override
-    public QueryResultStream<T> fromNow(final UUID queryId) throws RyaStreamsException {
+    public QueryResultStream<T> fromNow(final String ryaInstance, final UUID queryId) throws RyaStreamsException {
         requireNonNull(queryId);
 
         // Always start at the latest point within the topic.
-        return makeStream(queryId, "latest");
+        return makeStream(ryaInstance, queryId, "latest");
     }
 
-    private QueryResultStream<T> makeStream(final UUID queryId, final String autoOffsetResetConfig) {
+    private QueryResultStream<T> makeStream(final String ryaInstance, final UUID queryId, final String autoOffsetResetConfig) {
         // Configure which instance of Kafka to connect to.
         final Properties props = new Properties();
         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
@@ -106,7 +106,7 @@
         final KafkaConsumer<String, T> consumer = new KafkaConsumer<>(props);
 
         // Register the consumer for the query's results.
-        final String resultTopic = KafkaTopics.queryResultsTopic(queryId);
+        final String resultTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId);
         consumer.subscribe(Arrays.asList(resultTopic));
 
         // Return the result stream.
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTopicsTest.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTopicsTest.java
index a057de7..8eaf080 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTopicsTest.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTopicsTest.java
@@ -37,7 +37,7 @@
         final String topicName = KafkaTopics.queryChangeLogTopic(ryaInstance);
 
         // Show the rya instance name is able to be extracted from the topic.
-        final Optional<String> resolvedRyaInstance = KafkaTopics.getRyaInstance(topicName);
+        final Optional<String> resolvedRyaInstance = KafkaTopics.getRyaInstanceFromQueryChangeLog(topicName);
         assertEquals(ryaInstance, resolvedRyaInstance.get());
     }
 
@@ -47,7 +47,7 @@
         final String invalidTopic = "thisIsABadTopicName";
 
         // Show there is no Rya Instance name in it.
-        final Optional<String> ryaInstance = KafkaTopics.getRyaInstance(invalidTopic);
+        final Optional<String> ryaInstance = KafkaTopics.getRyaInstanceFromQueryChangeLog(invalidTopic);
         assertFalse( ryaInstance.isPresent() );
     }
 }
\ No newline at end of file
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStreamIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStreamIT.java
index 8882753..59c08b7 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStreamIT.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStreamIT.java
@@ -85,6 +85,7 @@
     @Test
     public void fromStart() throws Exception {
         // Create an ID for the query.
+        final String ryaInstance = UUID.randomUUID().toString();
         final UUID queryId = UUID.randomUUID();
 
         // Create a list of test VisibilityBindingSets.
@@ -106,7 +107,7 @@
         // Write some entries to the query result topic in Kafka.
         try(final Producer<?, VisibilityBindingSet> producer =
                 KafkaTestUtil.makeProducer(kafka, StringSerializer.class, VisibilityBindingSetSerializer.class)) {
-            final String resultTopic = KafkaTopics.queryResultsTopic(queryId);
+            final String resultTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId);
             for(final VisibilityBindingSet visBs : original) {
                 producer.send(new ProducerRecord<>(resultTopic, visBs));
             }
@@ -115,7 +116,7 @@
         // Use the interactor that is being tested to read all of the visibility binding sets.
         final GetQueryResultStream<VisibilityBindingSet> interactor =
                 new KafkaGetQueryResultStream<>(kafka.getKafkaHostname(), kafka.getKafkaPort(), VisibilityBindingSetDeserializer.class);
-        final List<VisibilityBindingSet> read = pollForResults(500, 3, 3, interactor.fromStart(queryId));
+        final List<VisibilityBindingSet> read = pollForResults(500, 3, 3, interactor.fromStart(ryaInstance, queryId));
 
         // Show the fetched binding sets match the original, as well as their order.
         assertEquals(original, read);
@@ -124,11 +125,12 @@
     @Test
     public void fromNow() throws Exception {
         // Create an ID for the query.
+        final String ryaInstance = UUID.randomUUID().toString();
         final UUID queryId = UUID.randomUUID();
 
         try(final Producer<?, VisibilityBindingSet> producer =
                 KafkaTestUtil.makeProducer(kafka, StringSerializer.class, VisibilityBindingSetSerializer.class)) {
-            final String resultTopic = KafkaTopics.queryResultsTopic(queryId);
+            final String resultTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId);
 
             // Write a single visibility binding set to the query's result topic. This will not appear in the expected results.
             final ValueFactory vf = new ValueFactoryImpl();
@@ -140,7 +142,7 @@
             // Use the interactor that is being tested to read all of the visibility binding sets that appear after this point.
             final GetQueryResultStream<VisibilityBindingSet> interactor =
                     new KafkaGetQueryResultStream<>(kafka.getKafkaHostname(), kafka.getKafkaPort(), VisibilityBindingSetDeserializer.class);
-            try(QueryResultStream<VisibilityBindingSet> results = interactor.fromNow(queryId)) {
+            try(QueryResultStream<VisibilityBindingSet> results = interactor.fromNow(ryaInstance, queryId)) {
                 // Read results from the stream.
                 List<VisibilityBindingSet> read = new ArrayList<>();
                 for(final VisibilityBindingSet visBs : results.poll(500)) {
@@ -178,12 +180,13 @@
     @Test(expected = IllegalStateException.class)
     public void pollClosedStream() throws Exception {
         // Create an ID for the query.
+        final String ryaInstance = UUID.randomUUID().toString();
         final UUID queryId = UUID.randomUUID();
 
         // Use the interactor that is being tested to create a result stream and immediately close it.
         final GetQueryResultStream<VisibilityBindingSet> interactor =
                 new KafkaGetQueryResultStream<>(kafka.getKafkaHostname(), kafka.getKafkaPort(), VisibilityBindingSetDeserializer.class);
-        final QueryResultStream<VisibilityBindingSet> results = interactor.fromStart(queryId);
+        final QueryResultStream<VisibilityBindingSet> results = interactor.fromStart(ryaInstance, queryId);
         results.close();
 
         // Try to poll the closed stream.
@@ -193,6 +196,7 @@
     @Test
     public void fromStart_visibilityStatements() throws Exception {
         // Create an ID for the query.
+        final String ryaInstance = UUID.randomUUID().toString();
         final UUID queryId = UUID.randomUUID();
 
         // Create some statements that will be written to the result topic.
@@ -205,7 +209,7 @@
         // Write the entries to the query result topic in Kafka.
         try(final Producer<?, VisibilityStatement> producer =
                 KafkaTestUtil.makeProducer(kafka, StringSerializer.class, VisibilityStatementSerializer.class)) {
-            final String resultTopic = KafkaTopics.queryResultsTopic(queryId);
+            final String resultTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId);
             for(final VisibilityStatement visStmt : original) {
                 producer.send(new ProducerRecord<>(resultTopic, visStmt));
             }
@@ -214,7 +218,7 @@
         // Use the interactor that is being tested to read all of the visibility binding sets.
         final GetQueryResultStream<VisibilityStatement> interactor =
                 new KafkaGetQueryResultStream<>(kafka.getKafkaHostname(), kafka.getKafkaPort(), VisibilityStatementDeserializer.class);
-        final List<VisibilityStatement> read = pollForResults(500, 3, 3, interactor.fromStart(queryId));
+        final List<VisibilityStatement> read = pollForResults(500, 3, 3, interactor.fromStart(ryaInstance, queryId));
 
         // Show the fetched binding sets match the original, as well as their order.
         assertEquals(original, read);
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java
index 5dbd27f..c9abb41 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java
@@ -90,7 +90,7 @@
         // Add the query to the query repository.
         final StreamsQuery sQuery = queries.add("SELECT * WHERE { ?person <urn:worksAt> ?business . }", true);
         final UUID queryId = sQuery.getQueryId();
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId);
 
         // The thread that will run the tested interactor.
         final Thread testThread = new Thread() {
diff --git a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSource.java b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSource.java
index e746baf..2aa7054 100644
--- a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSource.java
+++ b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSource.java
@@ -171,11 +171,11 @@
             final Set<String> changeLogTopics = new HashSet<>( listTopicsConsumer.listTopics().keySet() );
 
             // Remove all topics that are not valid Rya Query Change Log topic names.
-            changeLogTopics.removeIf( topic -> !KafkaTopics.getRyaInstance(topic).isPresent() );
+            changeLogTopics.removeIf( topic -> !KafkaTopics.getRyaInstanceFromQueryChangeLog(topic).isPresent() );
 
             // Extract the Rya instance names from the change log topics.
             final Set<String> ryaInstances = changeLogTopics.stream()
-                    .map(topic -> KafkaTopics.getRyaInstance(topic).get() )
+                    .map(topic -> KafkaTopics.getRyaInstanceFromQueryChangeLog(topic).get() )
                     .collect(Collectors.toSet());
 
             // Any Rya instances that are in the old set of topics, but not the new one, have been deleted.
diff --git a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutor.java b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutor.java
index 3a59636..4bd022a 100644
--- a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutor.java
+++ b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutor.java
@@ -128,7 +128,7 @@
             // Make sure the Statements topic exists for the query.
             final Set<String> topics = Sets.newHashSet(
                     KafkaTopics.statementsTopic(ryaInstance),
-                    KafkaTopics.queryResultsTopic(query.getQueryId()));
+                    KafkaTopics.queryResultsTopic(ryaInstance, query.getQueryId()));
 
             // Make sure the Query Results topic exists for the query.
             createKafkaTopic.createTopics(topics, 1, 1);
diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java
index f9c8a03..6358104 100644
--- a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java
+++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java
@@ -138,7 +138,7 @@
             loadStatements.fromCollection(statements);
 
             // Read the output of the streams program.
-            final String resultsTopic = KafkaTopics.queryResultsTopic(sQuery.getQueryId());
+            final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, sQuery.getQueryId());
             resultConsumer.subscribe( Lists.newArrayList(resultsTopic) );
             final List<VisibilityBindingSet> results = KafkaTestUtil.pollForResults(500, 6, 3, resultConsumer);
             assertEquals(expected, results);