RYA-462 Updated the Kafka topic name for StreamsQueries to include the Rya Instance name.
Conflicts:
extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/GetQueryResultStream.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/GetQueryResultStream.java
index 951d060..0e70391 100644
--- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/GetQueryResultStream.java
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/GetQueryResultStream.java
@@ -37,18 +37,20 @@
/**
* Stream all of the results that have been produced by a query.
*
+ * @param ryaInstance - The name of the Rya Instance the query is for. (not null)
* @param queryId - Indicates which query results to stream. (not null)
* @return A {@link QueryResultStream} that starts with the first result that was ever produced.
* @throws RyaStreamsException Could not create the result stream.
*/
- public QueryResultStream<T> fromStart(UUID queryId) throws RyaStreamsException;
+ public QueryResultStream<T> fromStart(String ryaInstance, UUID queryId) throws RyaStreamsException;
/**
* Stream results that have been produced by a query after this method was invoked.
*
+ * @param ryaInstance - The name of the Rya Instance the query is for. (not null)
* @param queryId - Indicates which query results to stream. (not null)
* @return A {@link QueryResultStream} that only returns results that were produced after this method is invoked.
* @throws RyaStreamsException Could not create the result stream.
*/
- public QueryResultStream<T> fromNow(UUID queryId) throws RyaStreamsException;
+ public QueryResultStream<T> fromNow(String ryaInstance, UUID queryId) throws RyaStreamsException;
}
\ No newline at end of file
diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java
index 7b311f6..f9a9458 100644
--- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java
+++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java
@@ -135,7 +135,7 @@
// Make sure the topics required by the application exists for the specified Rya instances.
final Set<String> topics = new HashSet<>();
topics.add( KafkaTopics.statementsTopic(params.ryaInstance) );
- topics.add( KafkaTopics.queryResultsTopic(queryId) );
+ topics.add( KafkaTopics.queryResultsTopic(params.ryaInstance, queryId) );
KafkaTopics.createTopics(params.zookeeperServers, topics, 1, 1);
// Run the query that uses those topics.
@@ -143,7 +143,7 @@
params.kafkaIP,
params.kafkaPort,
KafkaTopics.statementsTopic(params.ryaInstance),
- KafkaTopics.queryResultsTopic(queryId),
+ KafkaTopics.queryResultsTopic(params.ryaInstance, queryId),
queryRepo,
new TopologyFactory());
runQuery.run(queryId);
diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java
index 8ae4e08..783aedc 100644
--- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java
+++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -20,18 +20,11 @@
import static java.util.Objects.requireNonNull;
-import java.io.OutputStream;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.rya.api.model.VisibilityBindingSet;
-import org.apache.rya.api.model.VisibilityStatement;
-import org.apache.rya.api.utils.QueryInvestigator;
import org.apache.rya.streams.api.entity.QueryResultStream;
import org.apache.rya.streams.api.entity.StreamsQuery;
import org.apache.rya.streams.api.interactor.GetQueryResultStream;
@@ -39,13 +32,13 @@
import org.apache.rya.streams.api.queries.QueryChangeLog;
import org.apache.rya.streams.api.queries.QueryRepository;
import org.apache.rya.streams.client.RyaStreamsCommand;
-import org.apache.rya.streams.client.util.QueryResultsOutputUtil;
import org.apache.rya.streams.kafka.KafkaTopics;
import org.apache.rya.streams.kafka.interactor.KafkaGetQueryResultStream;
import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLogFactory;
import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer;
import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.algebra.Reduced;
import org.openrdf.query.algebra.TupleExpr;
import org.openrdf.query.parser.sparql.SPARQLParser;
@@ -72,20 +65,14 @@
@Parameter(names = {"--queryId", "-q"}, required = true, description = "The query whose results will be streamed to the console.")
private String queryId;
- @Parameter(names = {"--file", "-f"}, required = false, description = "If provided, the output file the results will stream into.")
- private String outputPath;
-
@Override
public String toString() {
final StringBuilder parameters = new StringBuilder();
parameters.append(super.toString());
if (!Strings.isNullOrEmpty(queryId)) {
- parameters.append("\tQuery ID: " + queryId + "\n");
- }
-
- if(!Strings.isNullOrEmpty(outputPath)) {
- parameters.append("\tOutput Path: " + outputPath + "\n");
+ parameters.append("\tQuery ID: " + queryId);
+ parameters.append("\n");
}
return parameters.toString();
@@ -173,12 +160,10 @@
});
// Build the interactor based on the type of result the query produces.
- boolean isStatementResults = false;
-
final GetQueryResultStream<?> getQueryResultStream;
try {
- isStatementResults = QueryInvestigator.isConstruct(sparql) | QueryInvestigator.isInsertWhere(sparql);
- if(isStatementResults) {
+ final TupleExpr tupleExpr = new SPARQLParser().parseQuery(sparql, null).getTupleExpr();
+ if(tupleExpr instanceof Reduced) {
getQueryResultStream = new KafkaGetQueryResultStream<>(params.kafkaIP, params.kafkaPort, VisibilityStatementDeserializer.class);
} else {
getQueryResultStream = new KafkaGetQueryResultStream<>(params.kafkaIP, params.kafkaPort, VisibilityBindingSetDeserializer.class);
@@ -187,22 +172,12 @@
throw new ExecutionException("Could not parse the SPARQL for the query: " + sparql, e);
}
- // Iterate through the results and print them to the configured output mechanism.
- try (final QueryResultStream<?> resultsStream = getQueryResultStream.fromStart(queryId)) {
- final TupleExpr tupleExpr = new SPARQLParser().parseQuery(sparql, null).getTupleExpr();
- if(params.outputPath != null) {
- final Path file = Paths.get(params.outputPath);
- try (final OutputStream out = Files.newOutputStream(file)) {
- if(isStatementResults) {
- final QueryResultStream<VisibilityStatement> stmtStream = (QueryResultStream<VisibilityStatement>) resultsStream;
- QueryResultsOutputUtil.toNtriplesFile(out, stmtStream, finished);
- } else {
- final QueryResultStream<VisibilityBindingSet> bsStream = (QueryResultStream<VisibilityBindingSet>) resultsStream;
- QueryResultsOutputUtil.toBindingSetJSONFile(out, tupleExpr, bsStream, finished);
- }
+ // Iterate through the results and print them to the console until the program or the stream ends.
+ try (final QueryResultStream<?> stream = getQueryResultStream.fromStart(params.ryaInstance, queryId)) {
+ while(!finished.get()) {
+ for(final Object result : stream.poll(1000)) {
+ System.out.println(result);
}
- } else {
- streamToSystemOut(resultsStream, finished);
}
} catch (final Exception e) {
System.err.println("Error while reading the results from the stream.");
@@ -210,15 +185,4 @@
System.exit(1);
}
}
-
- private static void streamToSystemOut(final QueryResultStream<?> stream, final AtomicBoolean shutdownSignal) throws Exception {
- requireNonNull(stream);
- requireNonNull(shutdownSignal);
-
- while(!shutdownSignal.get()) {
- for(final Object result : stream.poll(1000)) {
- System.out.println(result);
- }
- }
- }
-}
\ No newline at end of file
+}
diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java
index 5d63f32..176b920 100644
--- a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java
+++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java
@@ -184,7 +184,7 @@
loadStatements.fromCollection(statements);
// Read the output of the streams program.
- final String resultsTopic = KafkaTopics.queryResultsTopic(sQuery.getQueryId());
+ final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, sQuery.getQueryId());
resultConsumer.subscribe( Lists.newArrayList(resultsTopic) );
results = KafkaTestUtil.pollForResults(500, 6, 3, resultConsumer);
} finally {
diff --git a/extras/rya.streams/geo/src/test/java/org/apache/rya/streams/kafka/processors/filter/GeoFilterIT.java b/extras/rya.streams/geo/src/test/java/org/apache/rya/streams/kafka/processors/filter/GeoFilterIT.java
index c090afa..17a290a 100644
--- a/extras/rya.streams/geo/src/test/java/org/apache/rya/streams/kafka/processors/filter/GeoFilterIT.java
+++ b/extras/rya.streams/geo/src/test/java/org/apache/rya/streams/kafka/processors/filter/GeoFilterIT.java
@@ -88,7 +88,7 @@
final String ryaInstance = UUID.randomUUID().toString();
final UUID queryId = UUID.randomUUID();
final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
- final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId);
// Get the RDF model objects that will be used to build the query.
final String sparql =
diff --git a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorIT.java b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorIT.java
index a4e4a3e..2a1e760 100644
--- a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorIT.java
+++ b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorIT.java
@@ -34,7 +34,6 @@
import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
import org.apache.rya.streams.kafka.topology.TopologyFactory;
import org.apache.rya.test.kafka.KafkaTestInstanceRule;
-import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.openrdf.model.ValueFactory;
@@ -90,7 +89,7 @@
final String ryaInstance = UUID.randomUUID().toString();
final UUID queryId = UUID.randomUUID();
final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
- final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId);
// Setup a topology.
final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
@@ -137,7 +136,7 @@
final String ryaInstance = UUID.randomUUID().toString();
final UUID queryId = UUID.randomUUID();
final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
- final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId);
// Setup a topology.
final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
@@ -183,7 +182,7 @@
final String ryaInstance = UUID.randomUUID().toString();
final UUID queryId = UUID.randomUUID();
final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
- final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId);
// Setup a topology.
final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
@@ -233,7 +232,7 @@
final String ryaInstance = UUID.randomUUID().toString();
final UUID queryId = UUID.randomUUID();
final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
- final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId);
// Setup a topology.
final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
@@ -283,7 +282,7 @@
final String ryaInstance = UUID.randomUUID().toString();
final UUID queryId = UUID.randomUUID();
final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
- final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId);
// Setup a topology.
final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
@@ -371,7 +370,7 @@
final String ryaInstance = UUID.randomUUID().toString();
final UUID queryId = UUID.randomUUID();
final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
- final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId);
// Setup a topology.
final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
@@ -380,9 +379,7 @@
RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
}
- // Ignored because this test is kind of flakey.
@Test
- @Ignore
public void multipleAggregations() throws Exception {
// A query that figures out what the youngest and oldest ages are across all people.
final String sparql =
@@ -436,7 +433,7 @@
final String ryaInstance = UUID.randomUUID().toString();
final UUID queryId = UUID.randomUUID();
final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
- final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId);
// Setup a topology.
final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
diff --git a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java
index aaa67ea..7fb228a 100644
--- a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java
+++ b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java
@@ -54,7 +54,7 @@
final String ryaInstance = UUID.randomUUID().toString();
final UUID queryId = UUID.randomUUID();
final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
- final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId);
// Get the RDF model objects that will be used to build the query.
final String sparql =
diff --git a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/TemporalFilterIT.java b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/TemporalFilterIT.java
index 6e27669..11637b7 100644
--- a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/TemporalFilterIT.java
+++ b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/TemporalFilterIT.java
@@ -85,7 +85,7 @@
final String ryaInstance = UUID.randomUUID().toString();
final UUID queryId = UUID.randomUUID();
final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
- final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId);
// Get the RDF model objects that will be used to build the query.
final String sparql =
@@ -119,7 +119,7 @@
final String ryaInstance = UUID.randomUUID().toString();
final UUID queryId = UUID.randomUUID();
final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
- final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId);
// Get the RDF model objects that will be used to build the query.
final String sparql =
@@ -153,7 +153,7 @@
final String ryaInstance = UUID.randomUUID().toString();
final UUID queryId = UUID.randomUUID();
final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
- final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId);
// Get the RDF model objects that will be used to build the query.
final String sparql =
@@ -187,7 +187,7 @@
final String ryaInstance = UUID.randomUUID().toString();
final UUID queryId = UUID.randomUUID();
final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
- final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId);
// Get the RDF model objects that will be used to build the query.
final String sparql =
diff --git a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java
index bdb9be6..5f09372 100644
--- a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java
+++ b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java
@@ -69,7 +69,7 @@
final String ryaInstance = UUID.randomUUID().toString();
final UUID queryId = UUID.randomUUID();
final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
- final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId);
// Setup a topology.
final String query =
@@ -120,7 +120,7 @@
final String ryaInstance = UUID.randomUUID().toString();
final UUID queryId = UUID.randomUUID();
final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
- final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId);
// Setup a topology.
final String query =
@@ -171,7 +171,7 @@
final String ryaInstance = UUID.randomUUID().toString();
final UUID queryId = UUID.randomUUID();
final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
- final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId);
// Setup a topology.
final String query =
@@ -228,7 +228,7 @@
final String ryaInstance = UUID.randomUUID().toString();
final UUID queryId = UUID.randomUUID();
final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
- final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId);
// Setup a topology.
final String query =
@@ -269,7 +269,7 @@
final String ryaInstance = UUID.randomUUID().toString();
final UUID queryId = UUID.randomUUID();
final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
- final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId);
// Setup a topology.
final String query =
diff --git a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java
index a560294..c6fd1cf 100644
--- a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java
+++ b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java
@@ -28,7 +28,6 @@
import org.apache.rya.api.model.VisibilityStatement;
import org.apache.rya.streams.kafka.KafkaTopics;
import org.apache.rya.streams.kafka.RyaStreamsTestUtil;
-import org.apache.rya.streams.kafka.processors.projection.MultiProjectionProcessorSupplier;
import org.apache.rya.streams.kafka.processors.projection.MultiProjectionProcessorSupplier.MultiProjectionProcessor;
import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer;
import org.apache.rya.streams.kafka.topology.TopologyFactory;
@@ -54,7 +53,7 @@
final String ryaInstance = UUID.randomUUID().toString();
final UUID queryId = UUID.randomUUID();
final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
- final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId);
// Create a topology for the Query that will be tested.
final String sparql =
diff --git a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java
index 322bba9..f53f2c4 100644
--- a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java
+++ b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java
@@ -30,7 +30,6 @@
import org.apache.rya.api.model.VisibilityStatement;
import org.apache.rya.streams.kafka.KafkaTopics;
import org.apache.rya.streams.kafka.RyaStreamsTestUtil;
-import org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier;
import org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier.StatementPatternProcessor;
import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
import org.apache.rya.streams.kafka.topology.TopologyFactory;
@@ -57,7 +56,7 @@
final String ryaInstance = UUID.randomUUID().toString();
final UUID queryId = UUID.randomUUID();
final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
- final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId);
// Create a topology for the Query that will be tested.
final String sparql =
diff --git a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/sp/StatementPatternProcessorIT.java b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/sp/StatementPatternProcessorIT.java
index ba11e57..fd0a48d 100644
--- a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/sp/StatementPatternProcessorIT.java
+++ b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/sp/StatementPatternProcessorIT.java
@@ -30,7 +30,6 @@
import org.apache.rya.api.model.VisibilityStatement;
import org.apache.rya.streams.kafka.KafkaTopics;
import org.apache.rya.streams.kafka.RyaStreamsTestUtil;
-import org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier;
import org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier.StatementPatternProcessor;
import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
import org.apache.rya.streams.kafka.topology.TopologyFactory;
@@ -55,7 +54,7 @@
final String ryaInstance = UUID.randomUUID().toString();
final UUID queryId = UUID.randomUUID();
final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
- final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId);
// Setup a topology.
final String query = "SELECT * WHERE { ?person <urn:talksTo> ?otherPerson }";
@@ -85,7 +84,7 @@
final String ryaInstance = UUID.randomUUID().toString();
final UUID queryId = UUID.randomUUID();
final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
- final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId);
// Setup a topology.
final String query = "SELECT * WHERE { ?person <urn:talksTo> ?otherPerson }";
@@ -123,7 +122,7 @@
final String ryaInstance = UUID.randomUUID().toString();
final UUID queryId = UUID.randomUUID();
final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
- final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId);
// Setup a topology.
final String query = "SELECT * WHERE { "
@@ -157,7 +156,7 @@
final String ryaInstance = UUID.randomUUID().toString();
final UUID queryId = UUID.randomUUID();
final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
- final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId);
// Setup a topology.
final String query = "SELECT * WHERE { "
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java
index 989799a..f0cc842 100644
--- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java
@@ -62,7 +62,7 @@
* @param changeLogTopic - The topic to evaluate. (not null)
* @return If the topic is well formatted, then the Rya instance name that was part of the topic name.
*/
- public static Optional<String> getRyaInstance(final String changeLogTopic) {
+ public static Optional<String> getRyaInstanceFromQueryChangeLog(final String changeLogTopic) {
requireNonNull(changeLogTopic);
// Return absent if the provided topic does not represent a query change log topic.
@@ -93,9 +93,31 @@
* @param queryId - The id of the query the topic is for.
* @return The name of the Kafka topic.
*/
- public static String queryResultsTopic(final UUID queryId) {
+ public static String queryResultsTopic(final String ryaInstance, final UUID queryId) {
requireNonNull(queryId);
- return "QueryResults-" + queryId.toString();
+ return ryaInstance + "-QueryResults-" + queryId.toString();
+ }
+
+ /**
+ * TODO doc
+ *
+ * @param queryResultsTopic
+ * @return
+ */
+ public static String getRyaInstanceFromQueryResultsTopic(final String queryResultsTopic) {
+ // TODO
+ return "";
+ }
+
+ /**
+ * TODO doc
+ *
+ * @param queryResultsTopic
+ * @return
+ */
+ public static UUID getQueryIdFromQueryResultsTopic(final String queryResultsTopic) {
+ // TODO
+ return null;
}
/**
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/SingleThreadKafkaStreamsFactory.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/SingleThreadKafkaStreamsFactory.java
index 8093951..63d64b9 100644
--- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/SingleThreadKafkaStreamsFactory.java
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/SingleThreadKafkaStreamsFactory.java
@@ -78,7 +78,7 @@
// Setup the topology that processes the Query.
final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
- final String resultsTopic = KafkaTopics.queryResultsTopic(query.getQueryId());
+ final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, query.getQueryId());
try {
final TopologyBuilder topologyBuilder = topologyFactory.build(query.getSparql(), statementsTopic, resultsTopic, new RandomUUIDFactory());
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStream.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStream.java
index 529b493..d91edeb 100644
--- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStream.java
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStream.java
@@ -66,22 +66,22 @@
}
@Override
- public QueryResultStream<T> fromStart(final UUID queryId) throws RyaStreamsException {
+ public QueryResultStream<T> fromStart(final String ryaInstance, final UUID queryId) throws RyaStreamsException {
requireNonNull(queryId);
// Always start at the earliest point within the topic.
- return makeStream(queryId, "earliest");
+ return makeStream(ryaInstance, queryId, "earliest");
}
@Override
- public QueryResultStream<T> fromNow(final UUID queryId) throws RyaStreamsException {
+ public QueryResultStream<T> fromNow(final String ryaInstance, final UUID queryId) throws RyaStreamsException {
requireNonNull(queryId);
// Always start at the latest point within the topic.
- return makeStream(queryId, "latest");
+ return makeStream(ryaInstance, queryId, "latest");
}
- private QueryResultStream<T> makeStream(final UUID queryId, final String autoOffsetResetConfig) {
+ private QueryResultStream<T> makeStream(final String ryaInstance, final UUID queryId, final String autoOffsetResetConfig) {
// Configure which instance of Kafka to connect to.
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
@@ -106,7 +106,7 @@
final KafkaConsumer<String, T> consumer = new KafkaConsumer<>(props);
// Register the consumer for the query's results.
- final String resultTopic = KafkaTopics.queryResultsTopic(queryId);
+ final String resultTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId);
consumer.subscribe(Arrays.asList(resultTopic));
// Return the result stream.
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTopicsTest.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTopicsTest.java
index a057de7..8eaf080 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTopicsTest.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTopicsTest.java
@@ -37,7 +37,7 @@
final String topicName = KafkaTopics.queryChangeLogTopic(ryaInstance);
// Show the rya instance name is able to be extracted from the topic.
- final Optional<String> resolvedRyaInstance = KafkaTopics.getRyaInstance(topicName);
+ final Optional<String> resolvedRyaInstance = KafkaTopics.getRyaInstanceFromQueryChangeLog(topicName);
assertEquals(ryaInstance, resolvedRyaInstance.get());
}
@@ -47,7 +47,7 @@
final String invalidTopic = "thisIsABadTopicName";
// Show there is no Rya Instance name in it.
- final Optional<String> ryaInstance = KafkaTopics.getRyaInstance(invalidTopic);
+ final Optional<String> ryaInstance = KafkaTopics.getRyaInstanceFromQueryChangeLog(invalidTopic);
assertFalse( ryaInstance.isPresent() );
}
}
\ No newline at end of file
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStreamIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStreamIT.java
index 8882753..59c08b7 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStreamIT.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStreamIT.java
@@ -85,6 +85,7 @@
@Test
public void fromStart() throws Exception {
// Create an ID for the query.
+ final String ryaInstance = UUID.randomUUID().toString();
final UUID queryId = UUID.randomUUID();
// Create a list of test VisibilityBindingSets.
@@ -106,7 +107,7 @@
// Write some entries to the query result topic in Kafka.
try(final Producer<?, VisibilityBindingSet> producer =
KafkaTestUtil.makeProducer(kafka, StringSerializer.class, VisibilityBindingSetSerializer.class)) {
- final String resultTopic = KafkaTopics.queryResultsTopic(queryId);
+ final String resultTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId);
for(final VisibilityBindingSet visBs : original) {
producer.send(new ProducerRecord<>(resultTopic, visBs));
}
@@ -115,7 +116,7 @@
// Use the interactor that is being tested to read all of the visibility binding sets.
final GetQueryResultStream<VisibilityBindingSet> interactor =
new KafkaGetQueryResultStream<>(kafka.getKafkaHostname(), kafka.getKafkaPort(), VisibilityBindingSetDeserializer.class);
- final List<VisibilityBindingSet> read = pollForResults(500, 3, 3, interactor.fromStart(queryId));
+ final List<VisibilityBindingSet> read = pollForResults(500, 3, 3, interactor.fromStart(ryaInstance, queryId));
// Show the fetched binding sets match the original, as well as their order.
assertEquals(original, read);
@@ -124,11 +125,12 @@
@Test
public void fromNow() throws Exception {
// Create an ID for the query.
+ final String ryaInstance = UUID.randomUUID().toString();
final UUID queryId = UUID.randomUUID();
try(final Producer<?, VisibilityBindingSet> producer =
KafkaTestUtil.makeProducer(kafka, StringSerializer.class, VisibilityBindingSetSerializer.class)) {
- final String resultTopic = KafkaTopics.queryResultsTopic(queryId);
+ final String resultTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId);
// Write a single visibility binding set to the query's result topic. This will not appear in the expected results.
final ValueFactory vf = new ValueFactoryImpl();
@@ -140,7 +142,7 @@
// Use the interactor that is being tested to read all of the visibility binding sets that appear after this point.
final GetQueryResultStream<VisibilityBindingSet> interactor =
new KafkaGetQueryResultStream<>(kafka.getKafkaHostname(), kafka.getKafkaPort(), VisibilityBindingSetDeserializer.class);
- try(QueryResultStream<VisibilityBindingSet> results = interactor.fromNow(queryId)) {
+ try(QueryResultStream<VisibilityBindingSet> results = interactor.fromNow(ryaInstance, queryId)) {
// Read results from the stream.
List<VisibilityBindingSet> read = new ArrayList<>();
for(final VisibilityBindingSet visBs : results.poll(500)) {
@@ -178,12 +180,13 @@
@Test(expected = IllegalStateException.class)
public void pollClosedStream() throws Exception {
// Create an ID for the query.
+ final String ryaInstance = UUID.randomUUID().toString();
final UUID queryId = UUID.randomUUID();
// Use the interactor that is being tested to create a result stream and immediately close it.
final GetQueryResultStream<VisibilityBindingSet> interactor =
new KafkaGetQueryResultStream<>(kafka.getKafkaHostname(), kafka.getKafkaPort(), VisibilityBindingSetDeserializer.class);
- final QueryResultStream<VisibilityBindingSet> results = interactor.fromStart(queryId);
+ final QueryResultStream<VisibilityBindingSet> results = interactor.fromStart(ryaInstance, queryId);
results.close();
// Try to poll the closed stream.
@@ -193,6 +196,7 @@
@Test
public void fromStart_visibilityStatements() throws Exception {
// Create an ID for the query.
+ final String ryaInstance = UUID.randomUUID().toString();
final UUID queryId = UUID.randomUUID();
// Create some statements that will be written to the result topic.
@@ -205,7 +209,7 @@
// Write the entries to the query result topic in Kafka.
try(final Producer<?, VisibilityStatement> producer =
KafkaTestUtil.makeProducer(kafka, StringSerializer.class, VisibilityStatementSerializer.class)) {
- final String resultTopic = KafkaTopics.queryResultsTopic(queryId);
+ final String resultTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId);
for(final VisibilityStatement visStmt : original) {
producer.send(new ProducerRecord<>(resultTopic, visStmt));
}
@@ -214,7 +218,7 @@
// Use the interactor that is being tested to read all of the visibility binding sets.
final GetQueryResultStream<VisibilityStatement> interactor =
new KafkaGetQueryResultStream<>(kafka.getKafkaHostname(), kafka.getKafkaPort(), VisibilityStatementDeserializer.class);
- final List<VisibilityStatement> read = pollForResults(500, 3, 3, interactor.fromStart(queryId));
+ final List<VisibilityStatement> read = pollForResults(500, 3, 3, interactor.fromStart(ryaInstance, queryId));
// Show the fetched binding sets match the original, as well as their order.
assertEquals(original, read);
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java
index 5dbd27f..c9abb41 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java
@@ -90,7 +90,7 @@
// Add the query to the query repository.
final StreamsQuery sQuery = queries.add("SELECT * WHERE { ?person <urn:worksAt> ?business . }", true);
final UUID queryId = sQuery.getQueryId();
- final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId);
// The thread that will run the tested interactor.
final Thread testThread = new Thread() {
diff --git a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSource.java b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSource.java
index e746baf..2aa7054 100644
--- a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSource.java
+++ b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSource.java
@@ -171,11 +171,11 @@
final Set<String> changeLogTopics = new HashSet<>( listTopicsConsumer.listTopics().keySet() );
// Remove all topics that are not valid Rya Query Change Log topic names.
- changeLogTopics.removeIf( topic -> !KafkaTopics.getRyaInstance(topic).isPresent() );
+ changeLogTopics.removeIf( topic -> !KafkaTopics.getRyaInstanceFromQueryChangeLog(topic).isPresent() );
// Extract the Rya instance names from the change log topics.
final Set<String> ryaInstances = changeLogTopics.stream()
- .map(topic -> KafkaTopics.getRyaInstance(topic).get() )
+ .map(topic -> KafkaTopics.getRyaInstanceFromQueryChangeLog(topic).get() )
.collect(Collectors.toSet());
// Any Rya instances that are in the old set of topics, but not the new one, have been deleted.
diff --git a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutor.java b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutor.java
index 3a59636..4bd022a 100644
--- a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutor.java
+++ b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutor.java
@@ -128,7 +128,7 @@
// Make sure the Statements topic exists for the query.
final Set<String> topics = Sets.newHashSet(
KafkaTopics.statementsTopic(ryaInstance),
- KafkaTopics.queryResultsTopic(query.getQueryId()));
+ KafkaTopics.queryResultsTopic(ryaInstance, query.getQueryId()));
// Make sure the Query Results topic exists for the query.
createKafkaTopic.createTopics(topics, 1, 1);
diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java
index f9c8a03..6358104 100644
--- a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java
+++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java
@@ -138,7 +138,7 @@
loadStatements.fromCollection(statements);
// Read the output of the streams program.
- final String resultsTopic = KafkaTopics.queryResultsTopic(sQuery.getQueryId());
+ final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, sQuery.getQueryId());
resultConsumer.subscribe( Lists.newArrayList(resultsTopic) );
final List<VisibilityBindingSet> results = KafkaTestUtil.pollForResults(500, 6, 3, resultConsumer);
assertEquals(expected, results);