RYA-377 Implemented integration tests for the client commands.
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultListQueries.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultListQueries.java
index 82ca691..946944f 100644
--- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultListQueries.java
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultListQueries.java
@@ -38,10 +38,9 @@
     private final QueryRepository repository;
 
     /**
-     * Creates a new {@link DefaultAddQuery}.
+     * Creates a new {@link DefaultListQueries}.
      *
-     * @param repository - The {@link QueryRepository} to add a query to. (not
-     *        null)
+     * @param repository - The {@link QueryRepository} that hosts the listed queries. (not null)
      */
     public DefaultListQueries(final QueryRepository repository) {
         this.repository = requireNonNull(repository);
@@ -51,4 +50,4 @@
     public Set<StreamsQuery> all() throws RyaStreamsException {
         return repository.list();
     }
-}
+}
\ No newline at end of file
diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/RyaStreamsCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/RyaStreamsCommand.java
index 967b79e..5d64785 100644
--- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/RyaStreamsCommand.java
+++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/RyaStreamsCommand.java
@@ -26,23 +26,22 @@
 import edu.umd.cs.findbugs.annotations.NonNull;
 
 /**
- * A command that may be executed by the {@link PcjAdminClient}.
+ * A command that may be executed by the Rya Streams {@link CLIDriver}.
  */
 @DefaultAnnotation(NonNull.class)
 public interface RyaStreamsCommand {
+
     /**
-     * Command line parameters that are used by this command to configure
-     * itself.
+     * Command line parameters that are used by all commands that interact with Kafka.
      */
-    class Parameters {
-        @Parameter(names = { "--topic",
-        "-t" }, required = true, description = "The kafka topic to load the statements into.")
-        public String topicName;
-        @Parameter(names = { "--kafkaPort",
-        "-p" }, required = true, description = "The port to use to connect to Kafka.")
+    class KafkaParameters {
+        @Parameter(names = {"--ryaInstance", "-r"}, required = true, description = "The name of the Rya Instance the Rya Streams is a part of.")
+        public String ryaInstance;
+
+        @Parameter(names = { "--kafkaPort", "-p" }, required = true, description = "The port to use to connect to Kafka.")
         public String kafkaPort;
-        @Parameter(names = { "--kafkaHostname",
-        "-i" }, required = true, description = "The IP or Hostname to use to connect to Kafka.")
+
+        @Parameter(names = { "--kafkaHostname", "-i" }, required = true, description = "The IP or Hostname to use to connect to Kafka.")
         public String kafkaIP;
 
         @Override
@@ -51,8 +50,8 @@
             parameters.append("Parameters");
             parameters.append("\n");
 
-            if (!Strings.isNullOrEmpty(topicName)) {
-                parameters.append("\tTopic: " + topicName);
+            if(!Strings.isNullOrEmpty(ryaInstance)) {
+                parameters.append("\tRya Instance Name: " + ryaInstance + "\n");
             }
 
             if (!Strings.isNullOrEmpty(kafkaIP)) {
@@ -82,7 +81,7 @@
      * @return Describes what arguments may be provided to the command.
      */
     default public String getUsage() {
-        final JCommander parser = new JCommander(new Parameters());
+        final JCommander parser = new JCommander(new KafkaParameters());
 
         final StringBuilder usage = new StringBuilder();
         parser.usage(usage);
@@ -127,4 +126,4 @@
             super(message, cause);
         }
     }
-}
+}
\ No newline at end of file
diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/AddQueryCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/AddQueryCommand.java
index dfaa6c6..8439f20 100644
--- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/AddQueryCommand.java
+++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/AddQueryCommand.java
@@ -40,6 +40,7 @@
 import org.apache.rya.streams.api.queries.QueryChangeLog;
 import org.apache.rya.streams.api.queries.QueryRepository;
 import org.apache.rya.streams.client.RyaStreamsCommand;
+import org.apache.rya.streams.kafka.KafkaTopics;
 import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog;
 import org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserializer;
 import org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer;
@@ -62,12 +63,10 @@
     private static final Logger log = LoggerFactory.getLogger(AddQueryCommand.class);
 
     /**
-     * Command line parameters that are used by this command to configure
-     * itself.
+     * Command line parameters that are used by this command to configure itself.
      */
-    private class AddParameters extends RyaStreamsCommand.Parameters {
-        @Parameter(names = { "--query",
-        "-q" }, required = true, description = "The SPARQL query to add to Rya Streams.")
+    private class AddParameters extends RyaStreamsCommand.KafkaParameters {
+        @Parameter(names = { "--query", "-q" }, required = true, description = "The SPARQL query to add to Rya Streams.")
         private String query;
 
         @Override
@@ -121,15 +120,19 @@
         producerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, params.kafkaIP + ":" + params.kafkaPort);
         producerProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
         producerProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, QueryChangeSerializer.class.getName());
+
         final Properties consumerProperties = new Properties();
         consumerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, params.kafkaIP + ":" + params.kafkaPort);
         consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
         consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, QueryChangeDeserializer.class.getName());
+
         final Producer<?, QueryChange> queryProducer = new KafkaProducer<>(producerProperties);
         final Consumer<?, QueryChange> queryConsumer = new KafkaConsumer<>(consumerProperties);
 
-        final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, params.topicName);
+        final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, KafkaTopics.queryChangeLogTopic(params.ryaInstance));
         final QueryRepository repo = new InMemoryQueryRepository(changeLog);
+
+        // Execute the add query command.
         final AddQuery addQuery = new DefaultAddQuery(repo);
         try {
             final StreamsQuery query = addQuery.addQuery(params.query);
@@ -140,4 +143,4 @@
 
         log.trace("Finished executing the Add Query Command.");
     }
-}
+}
\ No newline at end of file
diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/DeleteQueryCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/DeleteQueryCommand.java
index 65a7017..b101a0f 100644
--- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/DeleteQueryCommand.java
+++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/DeleteQueryCommand.java
@@ -40,6 +40,7 @@
 import org.apache.rya.streams.api.queries.QueryChangeLog;
 import org.apache.rya.streams.api.queries.QueryRepository;
 import org.apache.rya.streams.client.RyaStreamsCommand;
+import org.apache.rya.streams.kafka.KafkaTopics;
 import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog;
 import org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserializer;
 import org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer;
@@ -62,10 +63,9 @@
     private static final Logger log = LoggerFactory.getLogger(DeleteQueryCommand.class);
 
     /**
-     * Command line parameters that are used by this command to configure
-     * itself.
+     * Command line parameters that are used by this command to configure itself.
      */
-    private class RemoveParameters extends RyaStreamsCommand.Parameters {
+    private class RemoveParameters extends RyaStreamsCommand.KafkaParameters {
         @Parameter(names = { "--queryID", "-q" }, required = true, description = "The ID of the query to remove from Rya Streams.")
         private String queryId;
 
@@ -73,7 +73,6 @@
         public String toString() {
             final StringBuilder parameters = new StringBuilder();
             parameters.append(super.toString());
-            parameters.append("\n");
 
             if (!Strings.isNullOrEmpty(queryId)) {
                 parameters.append("\tQueryID: " + queryId);
@@ -120,23 +119,27 @@
         producerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, params.kafkaIP + ":" + params.kafkaPort);
         producerProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
         producerProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, QueryChangeSerializer.class.getName());
+
         final Properties consumerProperties = new Properties();
         consumerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, params.kafkaIP + ":" + params.kafkaPort);
         consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
         consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, QueryChangeDeserializer.class.getName());
+
         final Producer<?, QueryChange> queryProducer = new KafkaProducer<>(producerProperties);
         final Consumer<?, QueryChange> queryConsumer = new KafkaConsumer<>(consumerProperties);
 
-        final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, params.topicName);
+        final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, KafkaTopics.queryChangeLogTopic(params.ryaInstance));
         final QueryRepository repo = new InMemoryQueryRepository(changeLog);
+
+        // Execute the delete query command.
         final DeleteQuery deleteQuery = new DefaultDeleteQuery(repo);
         try {
             deleteQuery.delete(UUID.fromString(params.queryId));
             log.trace("Deleted query: " + params.queryId);
         } catch (final RyaStreamsException e) {
-            log.error("Unable to parse query: " + params.queryId, e);
+            log.error("Unable to delete query with ID: " + params.queryId, e);
         }
 
-        log.trace("Finished executing the Add Query Command.");
+        log.trace("Finished executing the Delete Query Command.");
     }
-}
+}
\ No newline at end of file
diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java
index ec40b50..c4e5de6 100644
--- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java
+++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java
@@ -41,6 +41,7 @@
 import org.apache.rya.streams.api.queries.QueryChangeLog;
 import org.apache.rya.streams.api.queries.QueryRepository;
 import org.apache.rya.streams.client.RyaStreamsCommand;
+import org.apache.rya.streams.kafka.KafkaTopics;
 import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog;
 import org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserializer;
 import org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer;
@@ -75,7 +76,7 @@
         requireNonNull(args);
 
         // Parse the command line arguments.
-        final Parameters params = new Parameters();
+        final KafkaParameters params = new KafkaParameters();
         try {
             new JCommander(params, args);
         } catch (final ParameterException e) {
@@ -88,25 +89,29 @@
         producerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, params.kafkaIP + ":" + params.kafkaPort);
         producerProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
         producerProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, QueryChangeSerializer.class.getName());
+
         final Properties consumerProperties = new Properties();
         consumerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, params.kafkaIP + ":" + params.kafkaPort);
         consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
         consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, QueryChangeDeserializer.class.getName());
+
         final Producer<?, QueryChange> queryProducer = new KafkaProducer<>(producerProperties);
         final Consumer<?, QueryChange> queryConsumer = new KafkaConsumer<>(consumerProperties);
 
-        final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, params.topicName);
+        final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, KafkaTopics.queryChangeLogTopic(params.ryaInstance));
         final QueryRepository repo = new InMemoryQueryRepository(changeLog);
+
+        // Execute the list queries command.
         final ListQueries listQueries = new DefaultListQueries(repo);
         try {
             final Set<StreamsQuery> queries = listQueries.all();
-            logQueries(queries);
+            System.out.println( formatQueries(queries) );
         } catch (final RyaStreamsException e) {
             log.error("Unable to retrieve the queries.", e);
         }
     }
 
-    private void logQueries(final Set<StreamsQuery> queries) {
+    private String formatQueries(final Set<StreamsQuery> queries) {
         final StringBuilder sb = new StringBuilder();
         sb.append("\n");
         sb.append("Queries in Rya Streams:\n");
@@ -119,6 +124,6 @@
             sb.append(query.getSparql());
             sb.append("\n");
         });
-        log.trace(sb.toString());
+        return sb.toString();
     }
-}
+}
\ No newline at end of file
diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/LoadStatementsCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/LoadStatementsCommand.java
index 057de77..4763bd8 100644
--- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/LoadStatementsCommand.java
+++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/LoadStatementsCommand.java
@@ -27,10 +27,13 @@
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.rya.api.model.VisibilityStatement;
 import org.apache.rya.streams.api.interactor.LoadStatements;
 import org.apache.rya.streams.client.RyaStreamsCommand;
+import org.apache.rya.streams.kafka.KafkaTopics;
 import org.apache.rya.streams.kafka.interactor.KafkaLoadStatements;
+import org.apache.rya.streams.kafka.serialization.VisibilityStatementSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,9 +56,11 @@
     /**
      * Command line parameters that are used by this command to configure itself.
      */
-    private static final class LoadStatementsParameters extends RyaStreamsCommand.Parameters {
+    private static final class LoadStatementsParameters extends RyaStreamsCommand.KafkaParameters {
+
         @Parameter(names = {"--statementsFile", "-f"}, required = true, description = "The file of RDF statements to load into Rya Streams.")
         private String statementsFile;
+
         @Parameter(names= {"--visibilities", "-v"}, required = true, description = "The visibilities to assign to the statements being loaded in.")
         private String visibilities;
 
@@ -63,7 +68,6 @@
         public String toString() {
             final StringBuilder parameters = new StringBuilder();
             parameters.append(super.toString());
-            parameters.append("\n");
 
             if (!Strings.isNullOrEmpty(statementsFile)) {
                 parameters.append("\tStatements File: " + statementsFile);
@@ -117,7 +121,7 @@
 
         final Properties producerProps = buildProperties(params);
         try (final Producer<Object, VisibilityStatement> producer = new KafkaProducer<>(producerProps)) {
-            final LoadStatements statements = new KafkaLoadStatements(params.topicName, producer);
+            final LoadStatements statements = new KafkaLoadStatements(KafkaTopics.statementsTopic(params.ryaInstance), producer);
             statements.load(statementsPath, params.visibilities);
         } catch (final Exception e) {
             log.error("Unable to parse statements file: " + statementsPath.toString(), e);
@@ -130,6 +134,8 @@
         requireNonNull(params);
         final Properties props = new Properties();
         props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, params.kafkaIP + ":" + params.kafkaPort);
+        props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+        props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VisibilityStatementSerializer.class.getName());
         return props;
     }
 }
diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java
index 6b13b46..09e874c 100644
--- a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java
+++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java
@@ -21,10 +21,30 @@
 import static org.junit.Assert.assertEquals;
 
 import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
 
 import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.api.queries.InMemoryQueryRepository;
+import org.apache.rya.streams.api.queries.QueryChange;
+import org.apache.rya.streams.api.queries.QueryChangeLog;
+import org.apache.rya.streams.api.queries.QueryRepository;
+import org.apache.rya.streams.kafka.KafkaTopics;
+import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog;
+import org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserializer;
+import org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer;
 import org.apache.rya.test.kafka.KafkaITBase;
 import org.apache.rya.test.kafka.KafkaTestInstanceRule;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -33,7 +53,15 @@
  * integration Test for adding a new query through a command.
  */
 public class AddQueryCommandIT extends KafkaITBase {
-    private String[] args;
+
+    private final String ryaInstance = UUID.randomUUID().toString();
+
+    private String kafkaIp;
+    private String kafkaPort;
+    private QueryRepository queryRepo;
+
+    private Producer<?, QueryChange> queryProducer = null;
+    private Consumer<?, QueryChange> queryConsumer = null;
 
     @Rule
     public KafkaTestInstanceRule rule = new KafkaTestInstanceRule(true);
@@ -43,19 +71,74 @@
         final Properties props = rule.createBootstrapServerConfig();
         final String location = props.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
         final String[] tokens = location.split(":");
-        args = new String[] {
-                "-q", "Some sparql query",
-                "-t", rule.getKafkaTopicName(),
-                "-p", tokens[1],
-                "-i", tokens[0]
-        };
+
+        kafkaIp = tokens[0];
+        kafkaPort = tokens[1];
+
+        // Initialize the QueryRepository.
+        final Properties producerProperties = new Properties();
+        producerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaIp + ":" + kafkaPort);
+        producerProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+        producerProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, QueryChangeSerializer.class.getName());
+
+        final Properties consumerProperties = new Properties();
+        consumerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaIp + ":" + kafkaPort);
+        consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+        consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, QueryChangeDeserializer.class.getName());
+
+        queryProducer = new KafkaProducer<>(producerProperties);
+        queryConsumer = new KafkaConsumer<>(consumerProperties);
+
+        final String changeLogTopic = KafkaTopics.queryChangeLogTopic("" + ryaInstance);
+        final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, changeLogTopic);
+        queryRepo = new InMemoryQueryRepository(changeLog);
+    }
+
+    @After
+    public void cleanup() {
+        queryProducer.close();
+        queryConsumer.close();
     }
 
     @Test
-    public void happyAddQueryTest() throws Exception {
+    public void shortParams() throws Exception {
+        // Arguments that add a query to Rya Streams.
+        final String query = "SELECT * WHERE { ?person <urn:name> ?name }";
+        final String[] args = new String[] {
+                "-r", "" + ryaInstance,
+                "-i", kafkaIp,
+                "-p", kafkaPort,
+                "-q", query
+        };
+
+        // Execute the command.
         final AddQueryCommand command = new AddQueryCommand();
         command.execute(args);
-        // not sure what to assert here.
-        assertEquals(true, true);
+
+        // Show that the query was added to the Query Repository.
+        final Set<StreamsQuery> queries = queryRepo.list();
+        assertEquals(1, queries.size());
+        assertEquals(query, queries.iterator().next().getSparql());
     }
-}
+
+    @Test
+    public void longParams() throws Exception {
+        // Arguments that add a query to Rya Streams.
+        final String query = "SELECT * WHERE { ?person <urn:name> ?name }";
+        final String[] args = new String[] {
+                "--ryaInstance", "" + ryaInstance,
+                "--kafkaHostname", kafkaIp,
+                "--kafkaPort", kafkaPort,
+                "--query", query
+        };
+
+        // Execute the command.
+        final AddQueryCommand command = new AddQueryCommand();
+        command.execute(args);
+
+        // Show that the query was added to the Query Repository.
+        final Set<StreamsQuery> queries = queryRepo.list();
+        assertEquals(1, queries.size());
+        assertEquals(query, queries.iterator().next().getSparql());
+    }
+}
\ No newline at end of file
diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java
index db8c200..0079371 100644
--- a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java
+++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java
@@ -19,25 +19,49 @@
 package org.apache.rya.streams.client.command;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
 
 import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.api.queries.InMemoryQueryRepository;
+import org.apache.rya.streams.api.queries.QueryChange;
+import org.apache.rya.streams.api.queries.QueryChangeLog;
+import org.apache.rya.streams.api.queries.QueryRepository;
+import org.apache.rya.streams.kafka.KafkaTopics;
+import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog;
+import org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserializer;
+import org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer;
 import org.apache.rya.test.kafka.KafkaITBase;
 import org.apache.rya.test.kafka.KafkaTestInstanceRule;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 
-import com.google.common.collect.Lists;
-
 /**
  * Integration Test for deleting a query from Rya Streams through a command.
  */
 public class DeleteQueryCommandIT extends KafkaITBase {
-    private List<String> args;
+
+    private final String ryaInstance = UUID.randomUUID().toString();
+
+    private String kafkaIp;
+    private String kafkaPort;
+
+    private Producer<?, QueryChange> queryProducer = null;
+    private Consumer<?, QueryChange> queryConsumer = null;
 
     @Rule
     public KafkaTestInstanceRule rule = new KafkaTestInstanceRule(true);
@@ -48,28 +72,107 @@
         final String location = props.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
         final String[] tokens = location.split(":");
 
-        args = Lists.newArrayList(
-                "-t", rule.getKafkaTopicName(),
-                "-p", tokens[1],
-                "-i", tokens[0]
-                );
+        kafkaIp = tokens[0];
+        kafkaPort = tokens[1];
+    }
+
+    /**
+     * This test simulates executing many commands and each of them use their own InMemoryQueryRepository. We need
+     * to re-create the repo outside of the command to ensure it has the most up to date values inside of it.
+     */
+    private QueryRepository makeQueryRepository() {
+        final Properties producerProperties = new Properties();
+        producerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaIp + ":" + kafkaPort);
+        producerProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+        producerProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, QueryChangeSerializer.class.getName());
+
+        final Properties consumerProperties = new Properties();
+        consumerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaIp + ":" + kafkaPort);
+        consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+        consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, QueryChangeDeserializer.class.getName());
+
+        cleanup();
+        queryProducer = new KafkaProducer<>(producerProperties);
+        queryConsumer = new KafkaConsumer<>(consumerProperties);
+
+        final String changeLogTopic = KafkaTopics.queryChangeLogTopic("" + ryaInstance);
+        final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, changeLogTopic);
+        return new InMemoryQueryRepository(changeLog);
+    }
+
+    @After
+    public void cleanup() {
+        if(queryProducer != null) {
+            queryProducer.close();
+        }
+        if(queryConsumer != null) {
+            queryConsumer.close();
+        }
     }
 
     @Test
-    public void happyDeleteQueryTest() throws Exception {
-        // add a query so that it can be removed.
-        final List<String> addArgs = new ArrayList<>(args);
-        addArgs.add("-q");
-        addArgs.add("Some sparql query");
-        final AddQueryCommand addCommand = new AddQueryCommand();
-        addCommand.execute(addArgs.toArray(new String[] {}));
+    public void shortParams() throws Exception {
+        // Add a few queries to Rya Streams.
+        QueryRepository repo = makeQueryRepository();
+        repo.add("query1");
+        final UUID query2Id = repo.add("query2").getQueryId();
+        repo.add("query3");
 
-        final List<String> deleteArgs = new ArrayList<>(args);
-        addArgs.add("-q");
-        addArgs.add("12345");
+        // Show that all three of the queries were added.
+        Set<StreamsQuery> queries = repo.list();
+        assertEquals(3, queries.size());
+
+        // Delete query 2 using the delete query command.
+        final String[] deleteArgs = new String[] {
+                "-r", "" + ryaInstance,
+                "-i", kafkaIp,
+                "-p", kafkaPort,
+                "-q", query2Id.toString()
+        };
+
         final DeleteQueryCommand deleteCommand = new DeleteQueryCommand();
-        deleteCommand.execute(deleteArgs.toArray(new String[] {}));
-        // not sure what to assert here.
-        assertEquals(true, true);
+        deleteCommand.execute(deleteArgs);
+
+        // Show query2 was deleted.
+        repo = makeQueryRepository();
+        queries = repo.list();
+        assertEquals(2, queries.size());
+
+        for(final StreamsQuery query : queries) {
+            assertNotEquals(query2Id, query.getQueryId());
+        }
     }
-}
+
+    @Test
+    public void longParams() throws Exception {
+        // Add a few queries to Rya Streams.
+        QueryRepository repo = makeQueryRepository();
+        repo.add("query1");
+        final UUID query2Id = repo.add("query2").getQueryId();
+        repo.add("query3");
+
+        // Show that all three of the queries were added.
+        Set<StreamsQuery> queries = repo.list();
+        assertEquals(3, queries.size());
+
+        // Delete query 2 using the delete query command.
+        final String[] deleteArgs = new String[] {
+                "--ryaInstance", "" + ryaInstance,
+                "--kafkaHostname", kafkaIp,
+                "--kafkaPort", kafkaPort,
+                "--queryID", query2Id.toString()
+        };
+
+        final DeleteQueryCommand deleteCommand = new DeleteQueryCommand();
+        deleteCommand.execute(deleteArgs);
+
+        // Show query2 was deleted.
+        repo = makeQueryRepository();
+        queries = repo.list();
+        assertEquals(2, queries.size());
+
+        for(final StreamsQuery query : queries) {
+            assertNotEquals(query2Id, query.getQueryId());
+        }
+    }
+}
\ No newline at end of file
diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java
index be90c5f..eb759ba 100644
--- a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java
+++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java
@@ -18,13 +18,29 @@
  */
 package org.apache.rya.streams.client.command;
 
-import static org.junit.Assert.assertEquals;
-
 import java.util.Properties;
+import java.util.UUID;
 
 import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.rya.streams.api.queries.InMemoryQueryRepository;
+import org.apache.rya.streams.api.queries.QueryChange;
+import org.apache.rya.streams.api.queries.QueryChangeLog;
+import org.apache.rya.streams.api.queries.QueryRepository;
+import org.apache.rya.streams.kafka.KafkaTopics;
+import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog;
+import org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserializer;
+import org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer;
 import org.apache.rya.test.kafka.KafkaITBase;
 import org.apache.rya.test.kafka.KafkaTestInstanceRule;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -33,7 +49,15 @@
  * integration Test for listing queries through a command.
  */
 public class ListQueryCommandIT extends KafkaITBase {
-    private String[] args;
+
+    private final String ryaInstance = UUID.randomUUID().toString();
+
+    private String kafkaIp;
+    private String kafkaPort;
+    private QueryRepository queryRepo;
+
+    private Producer<?, QueryChange> queryProducer = null;
+    private Consumer<?, QueryChange> queryConsumer = null;
 
     @Rule
     public KafkaTestInstanceRule rule = new KafkaTestInstanceRule(true);
@@ -43,18 +67,69 @@
         final Properties props = rule.createBootstrapServerConfig();
         final String location = props.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
         final String[] tokens = location.split(":");
-        args = new String[] {
-                "-t", rule.getKafkaTopicName(),
-                "-p", tokens[1],
-                "-i", tokens[0]
+
+        kafkaIp = tokens[0];
+        kafkaPort = tokens[1];
+
+        // Initialize the QueryRepository.
+        final Properties producerProperties = new Properties();
+        producerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaIp + ":" + kafkaPort);
+        producerProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+        producerProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, QueryChangeSerializer.class.getName());
+
+        final Properties consumerProperties = new Properties();
+        consumerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaIp + ":" + kafkaPort);
+        consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+        consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, QueryChangeDeserializer.class.getName());
+
+        queryProducer = new KafkaProducer<>(producerProperties);
+        queryConsumer = new KafkaConsumer<>(consumerProperties);
+
+        final String changeLogTopic = KafkaTopics.queryChangeLogTopic("" + ryaInstance);
+        final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, changeLogTopic);
+        queryRepo = new InMemoryQueryRepository(changeLog);
+    }
+
+    @After
+    public void cleanup() {
+        queryProducer.close();
+        queryConsumer.close();
+    }
+
+
+    @Test
+    public void shortParams() throws Exception {
+        // Add a few queries to Rya Streams.
+        queryRepo.add("query1");
+        queryRepo.add("query2");
+        queryRepo.add("query3");
+
+        // Execute the List Queries command.
+        final String[] args = new String[] {
+                "-r", "" + ryaInstance,
+                "-i", kafkaIp,
+                "-p", kafkaPort
         };
+
+        final ListQueriesCommand command = new ListQueriesCommand();
+        command.execute(args);
     }
 
     @Test
-    public void happyListQueriesTest() throws Exception {
+    public void longParams() throws Exception {
+        // Add a few queries to Rya Streams.
+        queryRepo.add("query1");
+        queryRepo.add("query2");
+        queryRepo.add("query3");
+
+        // Execute the List Queries command.
+        final String[] args = new String[] {
+                "--ryaInstance", "" + ryaInstance,
+                "--kafkaHostname", kafkaIp,
+                "--kafkaPort", kafkaPort
+        };
+
         final ListQueriesCommand command = new ListQueriesCommand();
         command.execute(args);
-        // not sure what to assert here.
-        assertEquals(true, true);
     }
-}
+}
\ No newline at end of file
diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/LoadStatementsCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/LoadStatementsCommandIT.java
new file mode 100644
index 0000000..95a4876
--- /dev/null
+++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/LoadStatementsCommandIT.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.client.command;
+
+import static org.junit.Assert.assertEquals;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.rya.api.model.VisibilityStatement;
+import org.apache.rya.streams.kafka.KafkaTopics;
+import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer;
+import org.apache.rya.test.kafka.KafkaTestInstanceRule;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+
+/**
+ * Integration tests the methods of {@link LoadStatementsCommand}.
+ */
+public class LoadStatementsCommandIT {
+
+    private static final Path TURTLE_FILE = Paths.get("src/test/resources/statements.ttl");
+
+    private final String ryaInstance = UUID.randomUUID().toString();
+
+    private String kafkaIp;
+    private String kafkaPort;
+
+    @Rule
+    public KafkaTestInstanceRule rule = new KafkaTestInstanceRule(true);
+
+    @Before
+    public void setup() {
+        final Properties props = rule.createBootstrapServerConfig();
+        final String location = props.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
+        final String[] tokens = location.split(":");
+
+        kafkaIp = tokens[0];
+        kafkaPort = tokens[1];
+    }
+
+    @Test
+    public void shortParams() throws Exception {
+        // Load a file of statements into Kafka.
+        final String visibilities = "a|b|c";
+        final String[] args = new String[] {
+                "-r", "" + ryaInstance,
+                "-i", kafkaIp,
+                "-p", kafkaPort,
+                "-f", TURTLE_FILE.toString(),
+                "-v", visibilities
+        };
+
+        new LoadStatementsCommand().execute(args);
+
+        // Show that the statements were loaded into the topic.
+        // Read a VisibilityBindingSet from the test topic.
+        final List<VisibilityStatement> read = new ArrayList<>();
+
+        final Properties consumerProps = new Properties();
+        consumerProps.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaIp + ":" + kafkaPort);
+        consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
+        consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+        consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, VisibilityStatementDeserializer.class.getName());
+
+        try(final Consumer<String, VisibilityStatement> consumer = new KafkaConsumer<>(consumerProps)) {
+            final String topic = KafkaTopics.statementsTopic(ryaInstance);
+            consumer.subscribe(Arrays.asList(topic));
+            final ConsumerRecords<String, VisibilityStatement> records = consumer.poll(3000);
+
+            assertEquals(3, records.count());
+            final Iterator<ConsumerRecord<String, VisibilityStatement>> iter = records.iterator();
+            while(iter.hasNext()) {
+                final VisibilityStatement visiSet = iter.next().value();
+                read.add(visiSet);
+            }
+        }
+
+        final ValueFactory VF = ValueFactoryImpl.getInstance();
+        final List<VisibilityStatement> expected = new ArrayList<>();
+        expected.add(new VisibilityStatement(
+                VF.createStatement(VF.createURI("http://example#alice"), VF.createURI("http://example#talksTo"), VF.createURI("http://example#bob")),
+                visibilities));
+        expected.add(new VisibilityStatement(
+                VF.createStatement(VF.createURI("http://example#bob"), VF.createURI("http://example#talksTo"), VF.createURI("http://example#charlie")),
+                visibilities));
+        expected.add(new VisibilityStatement(
+                VF.createStatement(VF.createURI("http://example#charlie"), VF.createURI("http://example#likes"), VF.createURI("http://example#icecream")),
+                visibilities));
+
+        // Show the written statements matches the read ones.
+        assertEquals(expected, read);
+    }
+
+    @Test
+    public void longParams() throws Exception {
+        // Load a file of statements into Kafka.
+        final String visibilities = "a|b|c";
+        final String[] args = new String[] {
+                "--ryaInstance", "" + ryaInstance,
+                "--kafkaHostname", kafkaIp,
+                "--kafkaPort", kafkaPort,
+                "--statementsFile", TURTLE_FILE.toString(),
+                "--visibilities", visibilities
+        };
+
+        new LoadStatementsCommand().execute(args);
+
+        // Show that the statements were loaded into the topic.
+        // Read a VisibilityBindingSet from the test topic.
+        final List<VisibilityStatement> read = new ArrayList<>();
+
+        final Properties consumerProps = new Properties();
+        consumerProps.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaIp + ":" + kafkaPort);
+        consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
+        consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+        consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, VisibilityStatementDeserializer.class.getName());
+
+        try(final Consumer<String, VisibilityStatement> consumer = new KafkaConsumer<>(consumerProps)) {
+            final String topic = KafkaTopics.statementsTopic(ryaInstance);
+            consumer.subscribe(Arrays.asList(topic));
+            final ConsumerRecords<String, VisibilityStatement> records = consumer.poll(3000);
+
+            assertEquals(3, records.count());
+            final Iterator<ConsumerRecord<String, VisibilityStatement>> iter = records.iterator();
+            while(iter.hasNext()) {
+                final VisibilityStatement visiSet = iter.next().value();
+                read.add(visiSet);
+            }
+        }
+
+        final ValueFactory VF = ValueFactoryImpl.getInstance();
+        final List<VisibilityStatement> expected = new ArrayList<>();
+        expected.add(new VisibilityStatement(
+                VF.createStatement(VF.createURI("http://example#alice"), VF.createURI("http://example#talksTo"), VF.createURI("http://example#bob")),
+                visibilities));
+        expected.add(new VisibilityStatement(
+                VF.createStatement(VF.createURI("http://example#bob"), VF.createURI("http://example#talksTo"), VF.createURI("http://example#charlie")),
+                visibilities));
+        expected.add(new VisibilityStatement(
+                VF.createStatement(VF.createURI("http://example#charlie"), VF.createURI("http://example#likes"), VF.createURI("http://example#icecream")),
+                visibilities));
+
+        // Show the written statements matches the read ones.
+        assertEquals(expected, read);
+    }
+}
\ No newline at end of file
diff --git a/extras/rya.streams/client/src/test/resources/statements.ttl b/extras/rya.streams/client/src/test/resources/statements.ttl
new file mode 100644
index 0000000..c19e22d
--- /dev/null
+++ b/extras/rya.streams/client/src/test/resources/statements.ttl
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+@prefix example: <http://example#> .
+
+example:alice example:talksTo example:bob . 
+example:bob example:talksTo example:charlie . 
+example:charlie example:likes example:icecream .
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java
new file mode 100644
index 0000000..dfc4c9d
--- /dev/null
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka;
+
+import org.apache.rya.streams.api.queries.QueryChangeLog;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Creates the Kafka topic names that are used for Rya Streams systems.
+ */
+@DefaultAnnotation(NonNull.class)
+public class KafkaTopics {
+
+    /**
+     * Creates the Kafka topic that will be used for a specific instance of Rya's {@link QueryChangeLog}.
+     *
+     * @param ryaInstance - The Rya instance the change log is for. (not null)
+     * @return The name of the Kafka topic.
+     */
+    public static String queryChangeLogTopic(final String ryaInstance) {
+        return ryaInstance + "-QueryChangeLog";
+    }
+
+    /**
+     * Creates the Kafka topic that will be used to load statements into the Rya Streams system for a specific
+     * instance of Rya.
+     *
+     * @param ryaInstance - The Rya instance the statements are for. (not null)
+     * @return The name of the Kafka topic.
+     */
+    public static String statementsTopic(final String ryaInstance) {
+        return ryaInstance + "-Statements";
+    }
+}
\ No newline at end of file