RYA-377 Implemented integration tests for the client commands.
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultListQueries.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultListQueries.java
index 82ca691..946944f 100644
--- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultListQueries.java
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultListQueries.java
@@ -38,10 +38,9 @@
private final QueryRepository repository;
/**
- * Creates a new {@link DefaultAddQuery}.
+ * Creates a new {@link DefaultListQueries}.
*
- * @param repository - The {@link QueryRepository} to add a query to. (not
- * null)
+ * @param repository - The {@link QueryRepository} that hosts the listed queries. (not null)
*/
public DefaultListQueries(final QueryRepository repository) {
this.repository = requireNonNull(repository);
@@ -51,4 +50,4 @@
public Set<StreamsQuery> all() throws RyaStreamsException {
return repository.list();
}
-}
+}
\ No newline at end of file
diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/RyaStreamsCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/RyaStreamsCommand.java
index 967b79e..5d64785 100644
--- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/RyaStreamsCommand.java
+++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/RyaStreamsCommand.java
@@ -26,23 +26,22 @@
import edu.umd.cs.findbugs.annotations.NonNull;
/**
- * A command that may be executed by the {@link PcjAdminClient}.
+ * A command that may be executed by the Rya Streams {@link CLIDriver}.
*/
@DefaultAnnotation(NonNull.class)
public interface RyaStreamsCommand {
+
/**
- * Command line parameters that are used by this command to configure
- * itself.
+ * Command line parameters that are used by all commands that interact with Kafka.
*/
- class Parameters {
- @Parameter(names = { "--topic",
- "-t" }, required = true, description = "The kafka topic to load the statements into.")
- public String topicName;
- @Parameter(names = { "--kafkaPort",
- "-p" }, required = true, description = "The port to use to connect to Kafka.")
+ class KafkaParameters {
+ @Parameter(names = {"--ryaInstance", "-r"}, required = true, description = "The name of the Rya Instance the Rya Streams is a part of.")
+ public String ryaInstance;
+
+ @Parameter(names = { "--kafkaPort", "-p" }, required = true, description = "The port to use to connect to Kafka.")
public String kafkaPort;
- @Parameter(names = { "--kafkaHostname",
- "-i" }, required = true, description = "The IP or Hostname to use to connect to Kafka.")
+
+ @Parameter(names = { "--kafkaHostname", "-i" }, required = true, description = "The IP or Hostname to use to connect to Kafka.")
public String kafkaIP;
@Override
@@ -51,8 +50,8 @@
parameters.append("Parameters");
parameters.append("\n");
- if (!Strings.isNullOrEmpty(topicName)) {
- parameters.append("\tTopic: " + topicName);
+ if(!Strings.isNullOrEmpty(ryaInstance)) {
+ parameters.append("\tRya Instance Name: " + ryaInstance + "\n");
}
if (!Strings.isNullOrEmpty(kafkaIP)) {
@@ -82,7 +81,7 @@
* @return Describes what arguments may be provided to the command.
*/
default public String getUsage() {
- final JCommander parser = new JCommander(new Parameters());
+ final JCommander parser = new JCommander(new KafkaParameters());
final StringBuilder usage = new StringBuilder();
parser.usage(usage);
@@ -127,4 +126,4 @@
super(message, cause);
}
}
-}
+}
\ No newline at end of file
diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/AddQueryCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/AddQueryCommand.java
index dfaa6c6..8439f20 100644
--- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/AddQueryCommand.java
+++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/AddQueryCommand.java
@@ -40,6 +40,7 @@
import org.apache.rya.streams.api.queries.QueryChangeLog;
import org.apache.rya.streams.api.queries.QueryRepository;
import org.apache.rya.streams.client.RyaStreamsCommand;
+import org.apache.rya.streams.kafka.KafkaTopics;
import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog;
import org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserializer;
import org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer;
@@ -62,12 +63,10 @@
private static final Logger log = LoggerFactory.getLogger(AddQueryCommand.class);
/**
- * Command line parameters that are used by this command to configure
- * itself.
+ * Command line parameters that are used by this command to configure itself.
*/
- private class AddParameters extends RyaStreamsCommand.Parameters {
- @Parameter(names = { "--query",
- "-q" }, required = true, description = "The SPARQL query to add to Rya Streams.")
+ private class AddParameters extends RyaStreamsCommand.KafkaParameters {
+ @Parameter(names = { "--query", "-q" }, required = true, description = "The SPARQL query to add to Rya Streams.")
private String query;
@Override
@@ -121,15 +120,19 @@
producerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, params.kafkaIP + ":" + params.kafkaPort);
producerProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, QueryChangeSerializer.class.getName());
+
final Properties consumerProperties = new Properties();
consumerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, params.kafkaIP + ":" + params.kafkaPort);
consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, QueryChangeDeserializer.class.getName());
+
final Producer<?, QueryChange> queryProducer = new KafkaProducer<>(producerProperties);
final Consumer<?, QueryChange> queryConsumer = new KafkaConsumer<>(consumerProperties);
- final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, params.topicName);
+ final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, KafkaTopics.queryChangeLogTopic(params.ryaInstance));
final QueryRepository repo = new InMemoryQueryRepository(changeLog);
+
+ // Execute the add query command.
final AddQuery addQuery = new DefaultAddQuery(repo);
try {
final StreamsQuery query = addQuery.addQuery(params.query);
@@ -140,4 +143,4 @@
log.trace("Finished executing the Add Query Command.");
}
-}
+}
\ No newline at end of file
diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/DeleteQueryCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/DeleteQueryCommand.java
index 65a7017..b101a0f 100644
--- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/DeleteQueryCommand.java
+++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/DeleteQueryCommand.java
@@ -40,6 +40,7 @@
import org.apache.rya.streams.api.queries.QueryChangeLog;
import org.apache.rya.streams.api.queries.QueryRepository;
import org.apache.rya.streams.client.RyaStreamsCommand;
+import org.apache.rya.streams.kafka.KafkaTopics;
import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog;
import org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserializer;
import org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer;
@@ -62,10 +63,9 @@
private static final Logger log = LoggerFactory.getLogger(DeleteQueryCommand.class);
/**
- * Command line parameters that are used by this command to configure
- * itself.
+ * Command line parameters that are used by this command to configure itself.
*/
- private class RemoveParameters extends RyaStreamsCommand.Parameters {
+ private class RemoveParameters extends RyaStreamsCommand.KafkaParameters {
@Parameter(names = { "--queryID", "-q" }, required = true, description = "The ID of the query to remove from Rya Streams.")
private String queryId;
@@ -73,7 +73,6 @@
public String toString() {
final StringBuilder parameters = new StringBuilder();
parameters.append(super.toString());
- parameters.append("\n");
if (!Strings.isNullOrEmpty(queryId)) {
parameters.append("\tQueryID: " + queryId);
@@ -120,23 +119,27 @@
producerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, params.kafkaIP + ":" + params.kafkaPort);
producerProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, QueryChangeSerializer.class.getName());
+
final Properties consumerProperties = new Properties();
consumerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, params.kafkaIP + ":" + params.kafkaPort);
consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, QueryChangeDeserializer.class.getName());
+
final Producer<?, QueryChange> queryProducer = new KafkaProducer<>(producerProperties);
final Consumer<?, QueryChange> queryConsumer = new KafkaConsumer<>(consumerProperties);
- final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, params.topicName);
+ final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, KafkaTopics.queryChangeLogTopic(params.ryaInstance));
final QueryRepository repo = new InMemoryQueryRepository(changeLog);
+
+ // Execute the delete query command.
final DeleteQuery deleteQuery = new DefaultDeleteQuery(repo);
try {
deleteQuery.delete(UUID.fromString(params.queryId));
log.trace("Deleted query: " + params.queryId);
} catch (final RyaStreamsException e) {
- log.error("Unable to parse query: " + params.queryId, e);
+ log.error("Unable to delete query with ID: " + params.queryId, e);
}
- log.trace("Finished executing the Add Query Command.");
+ log.trace("Finished executing the Delete Query Command.");
}
-}
+}
\ No newline at end of file
diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java
index ec40b50..c4e5de6 100644
--- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java
+++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java
@@ -41,6 +41,7 @@
import org.apache.rya.streams.api.queries.QueryChangeLog;
import org.apache.rya.streams.api.queries.QueryRepository;
import org.apache.rya.streams.client.RyaStreamsCommand;
+import org.apache.rya.streams.kafka.KafkaTopics;
import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog;
import org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserializer;
import org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer;
@@ -75,7 +76,7 @@
requireNonNull(args);
// Parse the command line arguments.
- final Parameters params = new Parameters();
+ final KafkaParameters params = new KafkaParameters();
try {
new JCommander(params, args);
} catch (final ParameterException e) {
@@ -88,25 +89,29 @@
producerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, params.kafkaIP + ":" + params.kafkaPort);
producerProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, QueryChangeSerializer.class.getName());
+
final Properties consumerProperties = new Properties();
consumerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, params.kafkaIP + ":" + params.kafkaPort);
consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, QueryChangeDeserializer.class.getName());
+
final Producer<?, QueryChange> queryProducer = new KafkaProducer<>(producerProperties);
final Consumer<?, QueryChange> queryConsumer = new KafkaConsumer<>(consumerProperties);
- final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, params.topicName);
+ final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, KafkaTopics.queryChangeLogTopic(params.ryaInstance));
final QueryRepository repo = new InMemoryQueryRepository(changeLog);
+
+ // Execute the list queries command.
final ListQueries listQueries = new DefaultListQueries(repo);
try {
final Set<StreamsQuery> queries = listQueries.all();
- logQueries(queries);
+ System.out.println( formatQueries(queries) );
} catch (final RyaStreamsException e) {
log.error("Unable to retrieve the queries.", e);
}
}
- private void logQueries(final Set<StreamsQuery> queries) {
+ private String formatQueries(final Set<StreamsQuery> queries) {
final StringBuilder sb = new StringBuilder();
sb.append("\n");
sb.append("Queries in Rya Streams:\n");
@@ -119,6 +124,6 @@
sb.append(query.getSparql());
sb.append("\n");
});
- log.trace(sb.toString());
+ return sb.toString();
}
-}
+}
\ No newline at end of file
diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/LoadStatementsCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/LoadStatementsCommand.java
index 057de77..4763bd8 100644
--- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/LoadStatementsCommand.java
+++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/LoadStatementsCommand.java
@@ -27,10 +27,13 @@
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.rya.api.model.VisibilityStatement;
import org.apache.rya.streams.api.interactor.LoadStatements;
import org.apache.rya.streams.client.RyaStreamsCommand;
+import org.apache.rya.streams.kafka.KafkaTopics;
import org.apache.rya.streams.kafka.interactor.KafkaLoadStatements;
+import org.apache.rya.streams.kafka.serialization.VisibilityStatementSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,9 +56,11 @@
/**
* Command line parameters that are used by this command to configure itself.
*/
- private static final class LoadStatementsParameters extends RyaStreamsCommand.Parameters {
+ private static final class LoadStatementsParameters extends RyaStreamsCommand.KafkaParameters {
+
@Parameter(names = {"--statementsFile", "-f"}, required = true, description = "The file of RDF statements to load into Rya Streams.")
private String statementsFile;
+
@Parameter(names= {"--visibilities", "-v"}, required = true, description = "The visibilities to assign to the statements being loaded in.")
private String visibilities;
@@ -63,7 +68,6 @@
public String toString() {
final StringBuilder parameters = new StringBuilder();
parameters.append(super.toString());
- parameters.append("\n");
if (!Strings.isNullOrEmpty(statementsFile)) {
parameters.append("\tStatements File: " + statementsFile);
@@ -117,7 +121,7 @@
final Properties producerProps = buildProperties(params);
try (final Producer<Object, VisibilityStatement> producer = new KafkaProducer<>(producerProps)) {
- final LoadStatements statements = new KafkaLoadStatements(params.topicName, producer);
+ final LoadStatements statements = new KafkaLoadStatements(KafkaTopics.statementsTopic(params.ryaInstance), producer);
statements.load(statementsPath, params.visibilities);
} catch (final Exception e) {
log.error("Unable to parse statements file: " + statementsPath.toString(), e);
@@ -130,6 +134,8 @@
requireNonNull(params);
final Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, params.kafkaIP + ":" + params.kafkaPort);
+ props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VisibilityStatementSerializer.class.getName());
return props;
}
}
diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java
index 6b13b46..09e874c 100644
--- a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java
+++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java
@@ -21,10 +21,30 @@
import static org.junit.Assert.assertEquals;
import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.api.queries.InMemoryQueryRepository;
+import org.apache.rya.streams.api.queries.QueryChange;
+import org.apache.rya.streams.api.queries.QueryChangeLog;
+import org.apache.rya.streams.api.queries.QueryRepository;
+import org.apache.rya.streams.kafka.KafkaTopics;
+import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog;
+import org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserializer;
+import org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer;
import org.apache.rya.test.kafka.KafkaITBase;
import org.apache.rya.test.kafka.KafkaTestInstanceRule;
+import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -33,7 +53,15 @@
* integration Test for adding a new query through a command.
*/
public class AddQueryCommandIT extends KafkaITBase {
- private String[] args;
+
+ private final String ryaInstance = UUID.randomUUID().toString();
+
+ private String kafkaIp;
+ private String kafkaPort;
+ private QueryRepository queryRepo;
+
+ private Producer<?, QueryChange> queryProducer = null;
+ private Consumer<?, QueryChange> queryConsumer = null;
@Rule
public KafkaTestInstanceRule rule = new KafkaTestInstanceRule(true);
@@ -43,19 +71,74 @@
final Properties props = rule.createBootstrapServerConfig();
final String location = props.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
final String[] tokens = location.split(":");
- args = new String[] {
- "-q", "Some sparql query",
- "-t", rule.getKafkaTopicName(),
- "-p", tokens[1],
- "-i", tokens[0]
- };
+
+ kafkaIp = tokens[0];
+ kafkaPort = tokens[1];
+
+ // Initialize the QueryRepository.
+ final Properties producerProperties = new Properties();
+ producerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaIp + ":" + kafkaPort);
+ producerProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ producerProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, QueryChangeSerializer.class.getName());
+
+ final Properties consumerProperties = new Properties();
+ consumerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaIp + ":" + kafkaPort);
+ consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, QueryChangeDeserializer.class.getName());
+
+ queryProducer = new KafkaProducer<>(producerProperties);
+ queryConsumer = new KafkaConsumer<>(consumerProperties);
+
+ final String changeLogTopic = KafkaTopics.queryChangeLogTopic("" + ryaInstance);
+ final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, changeLogTopic);
+ queryRepo = new InMemoryQueryRepository(changeLog);
+ }
+
+ @After
+ public void cleanup() {
+ queryProducer.close();
+ queryConsumer.close();
}
@Test
- public void happyAddQueryTest() throws Exception {
+ public void shortParams() throws Exception {
+ // Arguments that add a query to Rya Streams.
+ final String query = "SELECT * WHERE { ?person <urn:name> ?name }";
+ final String[] args = new String[] {
+ "-r", "" + ryaInstance,
+ "-i", kafkaIp,
+ "-p", kafkaPort,
+ "-q", query
+ };
+
+ // Execute the command.
final AddQueryCommand command = new AddQueryCommand();
command.execute(args);
- // not sure what to assert here.
- assertEquals(true, true);
+
+ // Show that the query was added to the Query Repository.
+ final Set<StreamsQuery> queries = queryRepo.list();
+ assertEquals(1, queries.size());
+ assertEquals(query, queries.iterator().next().getSparql());
}
-}
+
+ @Test
+ public void longParams() throws Exception {
+ // Arguments that add a query to Rya Streams.
+ final String query = "SELECT * WHERE { ?person <urn:name> ?name }";
+ final String[] args = new String[] {
+ "--ryaInstance", "" + ryaInstance,
+ "--kafkaHostname", kafkaIp,
+ "--kafkaPort", kafkaPort,
+ "--query", query
+ };
+
+ // Execute the command.
+ final AddQueryCommand command = new AddQueryCommand();
+ command.execute(args);
+
+ // Show that the query was added to the Query Repository.
+ final Set<StreamsQuery> queries = queryRepo.list();
+ assertEquals(1, queries.size());
+ assertEquals(query, queries.iterator().next().getSparql());
+ }
+}
\ No newline at end of file
diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java
index db8c200..0079371 100644
--- a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java
+++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java
@@ -19,25 +19,49 @@
package org.apache.rya.streams.client.command;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.api.queries.InMemoryQueryRepository;
+import org.apache.rya.streams.api.queries.QueryChange;
+import org.apache.rya.streams.api.queries.QueryChangeLog;
+import org.apache.rya.streams.api.queries.QueryRepository;
+import org.apache.rya.streams.kafka.KafkaTopics;
+import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog;
+import org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserializer;
+import org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer;
import org.apache.rya.test.kafka.KafkaITBase;
import org.apache.rya.test.kafka.KafkaTestInstanceRule;
+import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
-import com.google.common.collect.Lists;
-
/**
* Integration Test for deleting a query from Rya Streams through a command.
*/
public class DeleteQueryCommandIT extends KafkaITBase {
- private List<String> args;
+
+ private final String ryaInstance = UUID.randomUUID().toString();
+
+ private String kafkaIp;
+ private String kafkaPort;
+
+ private Producer<?, QueryChange> queryProducer = null;
+ private Consumer<?, QueryChange> queryConsumer = null;
@Rule
public KafkaTestInstanceRule rule = new KafkaTestInstanceRule(true);
@@ -48,28 +72,107 @@
final String location = props.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
final String[] tokens = location.split(":");
- args = Lists.newArrayList(
- "-t", rule.getKafkaTopicName(),
- "-p", tokens[1],
- "-i", tokens[0]
- );
+ kafkaIp = tokens[0];
+ kafkaPort = tokens[1];
+ }
+
+ /**
+ * This test simulates executing many commands and each of them use their own InMemoryQueryRepository. We need
+ * to re-create the repo outside of the command to ensure it has the most up to date values inside of it.
+ */
+ private QueryRepository makeQueryRepository() {
+ final Properties producerProperties = new Properties();
+ producerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaIp + ":" + kafkaPort);
+ producerProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ producerProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, QueryChangeSerializer.class.getName());
+
+ final Properties consumerProperties = new Properties();
+ consumerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaIp + ":" + kafkaPort);
+ consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, QueryChangeDeserializer.class.getName());
+
+ cleanup();
+ queryProducer = new KafkaProducer<>(producerProperties);
+ queryConsumer = new KafkaConsumer<>(consumerProperties);
+
+ final String changeLogTopic = KafkaTopics.queryChangeLogTopic("" + ryaInstance);
+ final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, changeLogTopic);
+ return new InMemoryQueryRepository(changeLog);
+ }
+
+ @After
+ public void cleanup() {
+ if(queryProducer != null) {
+ queryProducer.close();
+ }
+ if(queryConsumer != null) {
+ queryConsumer.close();
+ }
}
@Test
- public void happyDeleteQueryTest() throws Exception {
- // add a query so that it can be removed.
- final List<String> addArgs = new ArrayList<>(args);
- addArgs.add("-q");
- addArgs.add("Some sparql query");
- final AddQueryCommand addCommand = new AddQueryCommand();
- addCommand.execute(addArgs.toArray(new String[] {}));
+ public void shortParams() throws Exception {
+ // Add a few queries to Rya Streams.
+ QueryRepository repo = makeQueryRepository();
+ repo.add("query1");
+ final UUID query2Id = repo.add("query2").getQueryId();
+ repo.add("query3");
- final List<String> deleteArgs = new ArrayList<>(args);
- addArgs.add("-q");
- addArgs.add("12345");
+ // Show that all three of the queries were added.
+ Set<StreamsQuery> queries = repo.list();
+ assertEquals(3, queries.size());
+
+ // Delete query 2 using the delete query command.
+ final String[] deleteArgs = new String[] {
+ "-r", "" + ryaInstance,
+ "-i", kafkaIp,
+ "-p", kafkaPort,
+ "-q", query2Id.toString()
+ };
+
final DeleteQueryCommand deleteCommand = new DeleteQueryCommand();
- deleteCommand.execute(deleteArgs.toArray(new String[] {}));
- // not sure what to assert here.
- assertEquals(true, true);
+ deleteCommand.execute(deleteArgs);
+
+ // Show query2 was deleted.
+ repo = makeQueryRepository();
+ queries = repo.list();
+ assertEquals(2, queries.size());
+
+ for(final StreamsQuery query : queries) {
+ assertNotEquals(query2Id, query.getQueryId());
+ }
}
-}
+
+ @Test
+ public void longParams() throws Exception {
+ // Add a few queries to Rya Streams.
+ QueryRepository repo = makeQueryRepository();
+ repo.add("query1");
+ final UUID query2Id = repo.add("query2").getQueryId();
+ repo.add("query3");
+
+ // Show that all three of the queries were added.
+ Set<StreamsQuery> queries = repo.list();
+ assertEquals(3, queries.size());
+
+ // Delete query 2 using the delete query command.
+ final String[] deleteArgs = new String[] {
+ "--ryaInstance", "" + ryaInstance,
+ "--kafkaHostname", kafkaIp,
+ "--kafkaPort", kafkaPort,
+ "--queryID", query2Id.toString()
+ };
+
+ final DeleteQueryCommand deleteCommand = new DeleteQueryCommand();
+ deleteCommand.execute(deleteArgs);
+
+ // Show query2 was deleted.
+ repo = makeQueryRepository();
+ queries = repo.list();
+ assertEquals(2, queries.size());
+
+ for(final StreamsQuery query : queries) {
+ assertNotEquals(query2Id, query.getQueryId());
+ }
+ }
+}
\ No newline at end of file
diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java
index be90c5f..eb759ba 100644
--- a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java
+++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java
@@ -18,13 +18,29 @@
*/
package org.apache.rya.streams.client.command;
-import static org.junit.Assert.assertEquals;
-
import java.util.Properties;
+import java.util.UUID;
import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.rya.streams.api.queries.InMemoryQueryRepository;
+import org.apache.rya.streams.api.queries.QueryChange;
+import org.apache.rya.streams.api.queries.QueryChangeLog;
+import org.apache.rya.streams.api.queries.QueryRepository;
+import org.apache.rya.streams.kafka.KafkaTopics;
+import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog;
+import org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserializer;
+import org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer;
import org.apache.rya.test.kafka.KafkaITBase;
import org.apache.rya.test.kafka.KafkaTestInstanceRule;
+import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -33,7 +49,15 @@
* integration Test for listing queries through a command.
*/
public class ListQueryCommandIT extends KafkaITBase {
- private String[] args;
+
+ private final String ryaInstance = UUID.randomUUID().toString();
+
+ private String kafkaIp;
+ private String kafkaPort;
+ private QueryRepository queryRepo;
+
+ private Producer<?, QueryChange> queryProducer = null;
+ private Consumer<?, QueryChange> queryConsumer = null;
@Rule
public KafkaTestInstanceRule rule = new KafkaTestInstanceRule(true);
@@ -43,18 +67,69 @@
final Properties props = rule.createBootstrapServerConfig();
final String location = props.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
final String[] tokens = location.split(":");
- args = new String[] {
- "-t", rule.getKafkaTopicName(),
- "-p", tokens[1],
- "-i", tokens[0]
+
+ kafkaIp = tokens[0];
+ kafkaPort = tokens[1];
+
+ // Initialize the QueryRepository.
+ final Properties producerProperties = new Properties();
+ producerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaIp + ":" + kafkaPort);
+ producerProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ producerProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, QueryChangeSerializer.class.getName());
+
+ final Properties consumerProperties = new Properties();
+ consumerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaIp + ":" + kafkaPort);
+ consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, QueryChangeDeserializer.class.getName());
+
+ queryProducer = new KafkaProducer<>(producerProperties);
+ queryConsumer = new KafkaConsumer<>(consumerProperties);
+
+ final String changeLogTopic = KafkaTopics.queryChangeLogTopic("" + ryaInstance);
+ final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, changeLogTopic);
+ queryRepo = new InMemoryQueryRepository(changeLog);
+ }
+
+ @After
+ public void cleanup() {
+ queryProducer.close();
+ queryConsumer.close();
+ }
+
+
+ @Test
+ public void shortParams() throws Exception {
+ // Add a few queries to Rya Streams.
+ queryRepo.add("query1");
+ queryRepo.add("query2");
+ queryRepo.add("query3");
+
+ // Execute the List Queries command.
+ final String[] args = new String[] {
+ "-r", "" + ryaInstance,
+ "-i", kafkaIp,
+ "-p", kafkaPort
};
+
+ final ListQueriesCommand command = new ListQueriesCommand();
+ command.execute(args);
}
@Test
- public void happyListQueriesTest() throws Exception {
+ public void longParams() throws Exception {
+ // Add a few queries to Rya Streams.
+ queryRepo.add("query1");
+ queryRepo.add("query2");
+ queryRepo.add("query3");
+
+ // Execute the List Queries command.
+ final String[] args = new String[] {
+ "--ryaInstance", "" + ryaInstance,
+ "--kafkaHostname", kafkaIp,
+ "--kafkaPort", kafkaPort
+ };
+
final ListQueriesCommand command = new ListQueriesCommand();
command.execute(args);
- // not sure what to assert here.
- assertEquals(true, true);
}
-}
+}
\ No newline at end of file
diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/LoadStatementsCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/LoadStatementsCommandIT.java
new file mode 100644
index 0000000..95a4876
--- /dev/null
+++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/LoadStatementsCommandIT.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.client.command;
+
+import static org.junit.Assert.assertEquals;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.rya.api.model.VisibilityStatement;
+import org.apache.rya.streams.kafka.KafkaTopics;
+import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer;
+import org.apache.rya.test.kafka.KafkaTestInstanceRule;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+
+/**
+ * Integration tests the methods of {@link LoadStatementsCommand}.
+ */
+public class LoadStatementsCommandIT {
+
+ private static final Path TURTLE_FILE = Paths.get("src/test/resources/statements.ttl");
+
+ private final String ryaInstance = UUID.randomUUID().toString();
+
+ private String kafkaIp;
+ private String kafkaPort;
+
+ @Rule
+ public KafkaTestInstanceRule rule = new KafkaTestInstanceRule(true);
+
+ @Before
+ public void setup() {
+ final Properties props = rule.createBootstrapServerConfig();
+ final String location = props.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
+ final String[] tokens = location.split(":");
+
+ kafkaIp = tokens[0];
+ kafkaPort = tokens[1];
+ }
+
+ @Test
+ public void shortParams() throws Exception {
+ // Load a file of statements into Kafka.
+ final String visibilities = "a|b|c";
+ final String[] args = new String[] {
+ "-r", "" + ryaInstance,
+ "-i", kafkaIp,
+ "-p", kafkaPort,
+ "-f", TURTLE_FILE.toString(),
+ "-v", visibilities
+ };
+
+ new LoadStatementsCommand().execute(args);
+
+ // Show that the statements were loaded into the topic.
+ // Read a VisibilityBindingSet from the test topic.
+ final List<VisibilityStatement> read = new ArrayList<>();
+
+ final Properties consumerProps = new Properties();
+ consumerProps.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaIp + ":" + kafkaPort);
+ consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
+ consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, VisibilityStatementDeserializer.class.getName());
+
+ try(final Consumer<String, VisibilityStatement> consumer = new KafkaConsumer<>(consumerProps)) {
+ final String topic = KafkaTopics.statementsTopic(ryaInstance);
+ consumer.subscribe(Arrays.asList(topic));
+ final ConsumerRecords<String, VisibilityStatement> records = consumer.poll(3000);
+
+ assertEquals(3, records.count());
+ final Iterator<ConsumerRecord<String, VisibilityStatement>> iter = records.iterator();
+ while(iter.hasNext()) {
+ final VisibilityStatement visiSet = iter.next().value();
+ read.add(visiSet);
+ }
+ }
+
+ final ValueFactory VF = ValueFactoryImpl.getInstance();
+ final List<VisibilityStatement> expected = new ArrayList<>();
+ expected.add(new VisibilityStatement(
+ VF.createStatement(VF.createURI("http://example#alice"), VF.createURI("http://example#talksTo"), VF.createURI("http://example#bob")),
+ visibilities));
+ expected.add(new VisibilityStatement(
+ VF.createStatement(VF.createURI("http://example#bob"), VF.createURI("http://example#talksTo"), VF.createURI("http://example#charlie")),
+ visibilities));
+ expected.add(new VisibilityStatement(
+ VF.createStatement(VF.createURI("http://example#charlie"), VF.createURI("http://example#likes"), VF.createURI("http://example#icecream")),
+ visibilities));
+
+ // Show the written statements matches the read ones.
+ assertEquals(expected, read);
+ }
+
+ @Test
+ public void longParams() throws Exception {
+ // Load a file of statements into Kafka.
+ final String visibilities = "a|b|c";
+ final String[] args = new String[] {
+ "--ryaInstance", "" + ryaInstance,
+ "--kafkaHostname", kafkaIp,
+ "--kafkaPort", kafkaPort,
+ "--statementsFile", TURTLE_FILE.toString(),
+ "--visibilities", visibilities
+ };
+
+ new LoadStatementsCommand().execute(args);
+
+ // Show that the statements were loaded into the topic.
+ // Read a VisibilityBindingSet from the test topic.
+ final List<VisibilityStatement> read = new ArrayList<>();
+
+ final Properties consumerProps = new Properties();
+ consumerProps.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaIp + ":" + kafkaPort);
+ consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
+ consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, VisibilityStatementDeserializer.class.getName());
+
+ try(final Consumer<String, VisibilityStatement> consumer = new KafkaConsumer<>(consumerProps)) {
+ final String topic = KafkaTopics.statementsTopic(ryaInstance);
+ consumer.subscribe(Arrays.asList(topic));
+ final ConsumerRecords<String, VisibilityStatement> records = consumer.poll(3000);
+
+ assertEquals(3, records.count());
+ final Iterator<ConsumerRecord<String, VisibilityStatement>> iter = records.iterator();
+ while(iter.hasNext()) {
+ final VisibilityStatement visiSet = iter.next().value();
+ read.add(visiSet);
+ }
+ }
+
+ final ValueFactory VF = ValueFactoryImpl.getInstance();
+ final List<VisibilityStatement> expected = new ArrayList<>();
+ expected.add(new VisibilityStatement(
+ VF.createStatement(VF.createURI("http://example#alice"), VF.createURI("http://example#talksTo"), VF.createURI("http://example#bob")),
+ visibilities));
+ expected.add(new VisibilityStatement(
+ VF.createStatement(VF.createURI("http://example#bob"), VF.createURI("http://example#talksTo"), VF.createURI("http://example#charlie")),
+ visibilities));
+ expected.add(new VisibilityStatement(
+ VF.createStatement(VF.createURI("http://example#charlie"), VF.createURI("http://example#likes"), VF.createURI("http://example#icecream")),
+ visibilities));
+
+ // Show the written statements matches the read ones.
+ assertEquals(expected, read);
+ }
+}
\ No newline at end of file
diff --git a/extras/rya.streams/client/src/test/resources/statements.ttl b/extras/rya.streams/client/src/test/resources/statements.ttl
new file mode 100644
index 0000000..c19e22d
--- /dev/null
+++ b/extras/rya.streams/client/src/test/resources/statements.ttl
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+@prefix example: <http://example#> .
+
+example:alice example:talksTo example:bob .
+example:bob example:talksTo example:charlie .
+example:charlie example:likes example:icecream .
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java
new file mode 100644
index 0000000..dfc4c9d
--- /dev/null
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka;
+
+import org.apache.rya.streams.api.queries.QueryChangeLog;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Creates the Kafka topic names that are used for Rya Streams systems.
+ */
+@DefaultAnnotation(NonNull.class)
+public class KafkaTopics {
+
+ /**
+ * Creates the Kafka topic that will be used for a specific instance of Rya's {@link QueryChangeLog}.
+ *
+ * @param ryaInstance - The Rya instance the change log is for. (not null)
+ * @return The name of the Kafka topic.
+ */
+ public static String queryChangeLogTopic(final String ryaInstance) {
+ return ryaInstance + "-QueryChangeLog";
+ }
+
+ /**
+ * Creates the Kafka topic that will be used to load statements into the Rya Streams system for a specific
+ * instance of Rya.
+ *
+ * @param ryaInstance - The Rya instance the statements are for. (not null)
+ * @return The name of the Kafka topic.
+ */
+ public static String statementsTopic(final String ryaInstance) {
+ return ryaInstance + "-Statements";
+ }
+}
\ No newline at end of file