RYA-440 Added commands to Rya Shell used to interact with Rya Streams. Closes #267.
diff --git a/common/rya.api/src/main/java/org/apache/rya/api/client/RyaClient.java b/common/rya.api/src/main/java/org/apache/rya/api/client/RyaClient.java
index 92b18a1..c122f43 100644
--- a/common/rya.api/src/main/java/org/apache/rya/api/client/RyaClient.java
+++ b/common/rya.api/src/main/java/org/apache/rya/api/client/RyaClient.java
@@ -45,6 +45,7 @@
     private final ListInstances listInstances;
     private final Optional<AddUser> addUser;
     private final Optional<RemoveUser> removeUser;
+    private final SetRyaStreamsConfiguration setRyaStreamsConfig;
     private final Uninstall uninstall;
     private final LoadStatements loadStatements;
     private final LoadStatementsFile loadStatementsFile;
@@ -65,6 +66,7 @@
             final ListInstances listInstances,
             final Optional<AddUser> addUser,
             final Optional<RemoveUser> removeUser,
+            final SetRyaStreamsConfiguration setRyaStreamsConfig,
             final Uninstall uninstall,
             final LoadStatements loadStatements,
             final LoadStatementsFile loadStatementsFile,
@@ -81,6 +83,7 @@
         this.listInstances = requireNonNull(listInstances);
         this.addUser = requireNonNull(addUser);
         this.removeUser = requireNonNull(removeUser);
+        this.setRyaStreamsConfig = requireNonNull(setRyaStreamsConfig);
         this.uninstall = requireNonNull(uninstall);
         this.loadStatements = requireNonNull(loadStatements);
         this.loadStatementsFile = requireNonNull(loadStatementsFile);
@@ -176,6 +179,13 @@
     }
 
     /**
+     * @return An instance of {@link SetRyaStreamsConfiguration} that is connected to a Rya storage.
+     */
+    public SetRyaStreamsConfiguration getSetRyaStreamsConfiguration() {
+        return setRyaStreamsConfig;
+    }
+
+    /**
      * @return An instance of {@link Uninstall} that is connected to a Rya storage.
      */
     public Uninstall getUninstall() {
diff --git a/common/rya.api/src/main/java/org/apache/rya/api/client/SetRyaStreamsConfiguration.java b/common/rya.api/src/main/java/org/apache/rya/api/client/SetRyaStreamsConfiguration.java
new file mode 100644
index 0000000..5e75f06
--- /dev/null
+++ b/common/rya.api/src/main/java/org/apache/rya/api/client/SetRyaStreamsConfiguration.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.client;
+
+import org.apache.rya.api.instance.RyaDetails.RyaStreamsDetails;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Update which Rya Streams subsystem a Rya instance is connected to.
+ */
+@DefaultAnnotation(NonNull.class)
+public interface SetRyaStreamsConfiguration {
+
+    /**
+     * Update which Rya Streams subsystem a Rya instance is connected to.
+     *
+     * @param instanceName - Indicates which Rya instance will have a Rya Streams subsystem assigned to it. (not null)
+     * @param streamsDetails - Indicates which Rya Streams subsystem the instance will use. (not null)
+     * @throws InstanceDoesNotExistException No instance of Rya exists for the provided name.
+     * @throws RyaClientException Something caused the command to fail.
+     */
+    public void setRyaStreamsConfiguration(String ryaInstance, RyaStreamsDetails streamsDetails) throws InstanceDoesNotExistException, RyaClientException;
+}
\ No newline at end of file
diff --git a/common/rya.api/src/main/java/org/apache/rya/api/instance/RyaDetails.java b/common/rya.api/src/main/java/org/apache/rya/api/instance/RyaDetails.java
index 9d2c1e5..bda7390 100644
--- a/common/rya.api/src/main/java/org/apache/rya/api/instance/RyaDetails.java
+++ b/common/rya.api/src/main/java/org/apache/rya/api/instance/RyaDetails.java
@@ -29,25 +29,22 @@
 import java.util.Map.Entry;
 import java.util.Objects;
 
-import edu.umd.cs.findbugs.annotations.Nullable;
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
-import net.jcip.annotations.Immutable;
-
-import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails;
-import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails;
-
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+import net.jcip.annotations.Immutable;
+
 /**
  * Details about how a Rya instance's state.
  */
 @Immutable
 @DefaultAnnotation(NonNull.class)
 public class RyaDetails implements Serializable {
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 2L;
 
     // General metadata about the instance.
     private final String instanceName;
@@ -68,6 +65,9 @@
     private final ProspectorDetails prospectorDetails;
     private final JoinSelectivityDetails joinSelectivityDetails;
 
+    // Rya Streams Details.
+    private final Optional<RyaStreamsDetails> ryaStreamsDetails;
+
     /**
      * Private to prevent initialization through the constructor. To build
      * instances of this class, use the {@link Builder}.
@@ -82,7 +82,8 @@
             final TemporalIndexDetails temporalDetails,
             final FreeTextIndexDetails freeTextDetails,
             final ProspectorDetails prospectorDetails,
-            final JoinSelectivityDetails joinSelectivityDetails) {
+            final JoinSelectivityDetails joinSelectivityDetails,
+            final Optional<RyaStreamsDetails> ryaStreamsDetails) {
         this.instanceName = requireNonNull(instanceName);
         this.version = requireNonNull(version);
         this.users = requireNonNull(users);
@@ -93,6 +94,7 @@
         this.freeTextDetails = requireNonNull(freeTextDetails);
         this.prospectorDetails = requireNonNull(prospectorDetails);
         this.joinSelectivityDetails = requireNonNull(joinSelectivityDetails);
+        this.ryaStreamsDetails = requireNonNull(ryaStreamsDetails);
     }
 
     /**
@@ -168,6 +170,13 @@
         return joinSelectivityDetails;
     }
 
+    /**
+     * @return Information about the instance's Rya Streams integration, if it was set.
+     */
+    public Optional<RyaStreamsDetails> getRyaStreamsDetails() {
+        return ryaStreamsDetails;
+    }
+
     @Override
     public int hashCode() {
         return Objects.hash(
@@ -179,7 +188,8 @@
                 temporalDetails,
                 freeTextDetails,
                 prospectorDetails,
-                joinSelectivityDetails);
+                joinSelectivityDetails,
+                ryaStreamsDetails);
     }
 
     @Override
@@ -197,7 +207,8 @@
                     Objects.equals(temporalDetails, details.temporalDetails) &&
                     Objects.equals(freeTextDetails, details.freeTextDetails) &&
                     Objects.equals(prospectorDetails, details.prospectorDetails) &&
-                    Objects.equals(joinSelectivityDetails, details.joinSelectivityDetails);
+                    Objects.equals(joinSelectivityDetails, details.joinSelectivityDetails) &&
+                    Objects.equals(ryaStreamsDetails, details.ryaStreamsDetails);
         }
         return false;
     }
@@ -239,6 +250,9 @@
         private ProspectorDetails prospectorDetails;
         private JoinSelectivityDetails joinSelectivityDetails;
 
+        // Rya Streams Details.
+        private RyaStreamsDetails ryaStreamsDetails;
+
         /**
          * Construcst an empty instance of {@link Builder}.
          */
@@ -262,6 +276,7 @@
             freeTextDetails = details.freeTextDetails;
             prospectorDetails = details.prospectorDetails;
             joinSelectivityDetails = details.joinSelectivityDetails;
+            ryaStreamsDetails = details.ryaStreamsDetails.orNull();
         }
 
         /**
@@ -375,6 +390,15 @@
         }
 
         /**
+         * @param ryaStreamsDetails - Information about the instance's Rya Streams integration.
+         * @return This {@link Builder} so that method invocations may be chained.
+         */
+        public Builder setRyaStreamsDetails(@Nullable final RyaStreamsDetails ryaStreamsDetails) {
+            this.ryaStreamsDetails = ryaStreamsDetails;
+            return this;
+        }
+
+        /**
          * @return An instance of {@link RyaDetails} built using this
          *   builder's values.
          */
@@ -389,7 +413,8 @@
                     temporalDetails,
                     freeTextDetails,
                     prospectorDetails,
-                    joinSelectivityDetails);
+                    joinSelectivityDetails,
+                    Optional.fromNullable(ryaStreamsDetails));
         }
     }
 
@@ -1071,4 +1096,57 @@
             return false;
         }
     }
+
+    /**
+     * Details about the Rya instance's Rya Streams integration.
+     */
+    public static class RyaStreamsDetails implements Serializable {
+        private static final long serialVersionUID = 1L;
+
+        private final String hostname;
+        private final int port;
+
+        /**
+         * Constructs an instance of {@link RyaStreamsDetails}.
+         *
+         * @param hostname - The hostname used to communicate with the Rya Streams subsystem. (not null)
+         * @param port - The port used to communicate with the Rya Streams subsystem.
+         */
+        public RyaStreamsDetails(final String hostname, final int port) {
+            this.hostname = requireNonNull(hostname);
+            this.port = port;
+        }
+
+        /**
+         * @return The hostname used to communicate with the Rya Streams subsystem.
+         */
+        public String getHostname() {
+            return hostname;
+        }
+
+        /**
+         * @return The port used to communicate with the Rya Streams subsystem.
+         */
+        public int getPort() {
+            return port;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(hostname, port);
+        }
+
+        @Override
+        public boolean equals(final Object obj) {
+            if(this == obj) {
+                return true;
+            }
+            if(obj instanceof RyaStreamsDetails) {
+                final RyaStreamsDetails other = (RyaStreamsDetails) obj;
+                return Objects.equals(hostname, other.hostname) &&
+                        port == other.port;
+            }
+            return false;
+        }
+    }
 }
\ No newline at end of file
diff --git a/common/rya.api/src/test/java/org/apache/rya/api/instance/RyaDetailsTest.java b/common/rya.api/src/test/java/org/apache/rya/api/instance/RyaDetailsTest.java
index a356877..b6e92e0 100644
--- a/common/rya.api/src/test/java/org/apache/rya/api/instance/RyaDetailsTest.java
+++ b/common/rya.api/src/test/java/org/apache/rya/api/instance/RyaDetailsTest.java
@@ -31,6 +31,7 @@
 import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails;
 import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails.PCJUpdateStrategy;
 import org.apache.rya.api.instance.RyaDetails.ProspectorDetails;
+import org.apache.rya.api.instance.RyaDetails.RyaStreamsDetails;
 import org.apache.rya.api.instance.RyaDetails.TemporalIndexDetails;
 import org.junit.Test;
 
@@ -65,7 +66,8 @@
                                     .setId("pcj 2")
                                     .setUpdateStrategy(PCJUpdateStrategy.INCREMENTAL)))
             .setProspectorDetails( new ProspectorDetails(Optional.of(new Date())) )
-            .setJoinSelectivityDetails( new JoinSelectivityDetails(Optional.of(new Date())) );
+            .setJoinSelectivityDetails( new JoinSelectivityDetails(Optional.of(new Date())) )
+            .setRyaStreamsDetails(new RyaStreamsDetails("localhost", 5));
 
         final RyaDetails details1 = builder.build();
         final RyaDetails details2 = builder.build();
@@ -96,7 +98,8 @@
                                     .setId("pcj 2")
                                     .setUpdateStrategy(PCJUpdateStrategy.INCREMENTAL)))
             .setProspectorDetails( new ProspectorDetails(Optional.of(new Date())) )
-            .setJoinSelectivityDetails( new JoinSelectivityDetails(Optional.of(new Date())) );
+            .setJoinSelectivityDetails( new JoinSelectivityDetails(Optional.of(new Date())) )
+            .setRyaStreamsDetails(new RyaStreamsDetails("localhost", 5));
 
         final RyaDetails details1 = builder.build();
         final RyaDetails details2 = builder.build();
@@ -127,6 +130,7 @@
                                     .setUpdateStrategy(PCJUpdateStrategy.INCREMENTAL)))
             .setProspectorDetails( new ProspectorDetails(Optional.of(new Date())) )
             .setJoinSelectivityDetails( new JoinSelectivityDetails(Optional.of(new Date())) )
+            .setRyaStreamsDetails(new RyaStreamsDetails("localhost", 5))
             .build();
 
         // Create a new Builder using another RyaDetails object.
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/instance/MongoDetailsAdapter.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/instance/MongoDetailsAdapter.java
index 8010808..f86c150 100644
--- a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/instance/MongoDetailsAdapter.java
+++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/instance/MongoDetailsAdapter.java
@@ -32,10 +32,10 @@
 import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails;
 import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails.PCJUpdateStrategy;
 import org.apache.rya.api.instance.RyaDetails.ProspectorDetails;
+import org.apache.rya.api.instance.RyaDetails.RyaStreamsDetails;
 import org.apache.rya.api.instance.RyaDetails.TemporalIndexDetails;
 
 import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
 import com.mongodb.BasicDBList;
 import com.mongodb.BasicDBObject;
 import com.mongodb.BasicDBObjectBuilder;
@@ -48,7 +48,6 @@
  * Serializes configuration details for use in Mongo.
  * The {@link DBObject} will look like:
  * <pre>
- * {@code
  * {
  *   "instanceName": &lt;string&gt;,
  *   "version": &lt;string&gt;?,
@@ -68,6 +67,10 @@
  *   "freeTextDetails": &lt;boolean&gt;,
  *   "prospectorDetails": &lt;date&gt;,
  *   "joinSelectivityDetails": &lt;date&gt;
+ *   "ryaStreamsDetails": {
+ *       "hostname": &lt;string&gt;
+ *       "port": &lt;int&gt;
+ *   }
  * }
  * </pre>
  */
@@ -91,13 +94,19 @@
     public static final String PROSPECTOR_DETAILS_KEY = "prospectorDetails";
     public static final String JOIN_SELECTIVITY_DETAILS_KEY = "joinSelectivitiyDetails";
 
+    public static final String RYA_STREAMS_DETAILS_KEY = "ryaStreamsDetails";
+    public static final String RYA_STREAMS_HOSTNAME_KEY = "hostname";
+    public static final String RYA_STREAMS_PORT_KEY = "port";
+
     /**
-     * Serializes {@link RyaDetails} to mongo {@link DBObject}.
-     * @param details - The details to be serialized.
-     * @return The mongo {@link DBObject}.
+     * Converts a {@link RyaDetails} object into its MongoDB {@link DBObject} equivalent.
+     *
+     * @param details - The details to convert. (not null)
+     * @return The MongoDB {@link DBObject} equivalent.
      */
     public static BasicDBObject toDBObject(final RyaDetails details) {
-        Preconditions.checkNotNull(details);
+        requireNonNull(details);
+
         final BasicDBObjectBuilder builder = BasicDBObjectBuilder.start()
                 .add(INSTANCE_KEY, details.getRyaInstanceName())
                 .add(VERSION_KEY, details.getRyaVersion())
@@ -106,12 +115,29 @@
                 .add(PCJ_DETAILS_KEY, toDBObject(details.getPCJIndexDetails()))
                 .add(TEMPORAL_DETAILS_KEY, details.getTemporalIndexDetails().isEnabled())
                 .add(FREETEXT_DETAILS_KEY, details.getFreeTextIndexDetails().isEnabled());
+
         if(details.getProspectorDetails().getLastUpdated().isPresent()) {
             builder.add(PROSPECTOR_DETAILS_KEY, details.getProspectorDetails().getLastUpdated().get());
         }
+
         if(details.getJoinSelectivityDetails().getLastUpdated().isPresent()) {
             builder.add(JOIN_SELECTIVITY_DETAILS_KEY, details.getJoinSelectivityDetails().getLastUpdated().get());
         }
+
+        // If the Rya Streams Details are present, then add them.
+        if(details.getRyaStreamsDetails().isPresent()) {
+            final RyaStreamsDetails ryaStreamsDetails = details.getRyaStreamsDetails().get();
+
+            // The embedded object that holds onto the fields.
+            final DBObject ryaStreamsFields = BasicDBObjectBuilder.start()
+                    .add(RYA_STREAMS_HOSTNAME_KEY, ryaStreamsDetails.getHostname())
+                    .add(RYA_STREAMS_PORT_KEY, ryaStreamsDetails.getPort())
+                    .get();
+
+            // Add them to the main builder.
+            builder.add(RYA_STREAMS_DETAILS_KEY, ryaStreamsFields);
+        }
+
         return (BasicDBObject) builder.get();
     }
 
@@ -154,20 +180,38 @@
         return builder.get();
     }
 
+    /**
+     * Converts a MongoDB {@link DBObject} into its {@link RyaDetails} equivalent.
+     *
+     * @param mongoObj - The MongoDB object to convert. (not null)
+     * @return The equivalent {@link RyaDetails} object.
+     * @throws MalformedRyaDetailsException The MongoDB object could not be converted.
+     */
     public static RyaDetails toRyaDetails(final DBObject mongoObj) throws MalformedRyaDetailsException {
+        requireNonNull(mongoObj);
         final BasicDBObject basicObj = (BasicDBObject) mongoObj;
         try {
-            return RyaDetails.builder()
-                    .setRyaInstanceName(basicObj.getString(INSTANCE_KEY))
-                    .setRyaVersion(basicObj.getString(VERSION_KEY))
-                    .setEntityCentricIndexDetails(new EntityCentricIndexDetails(basicObj.getBoolean(ENTITY_DETAILS_KEY)))
-                    //RYA-215            .setGeoIndexDetails(new GeoIndexDetails(basicObj.getBoolean(GEO_DETAILS_KEY)))
-                    .setPCJIndexDetails(getPCJIndexDetails(basicObj))
-                    .setTemporalIndexDetails(new TemporalIndexDetails(basicObj.getBoolean(TEMPORAL_DETAILS_KEY)))
-                    .setFreeTextDetails(new FreeTextIndexDetails(basicObj.getBoolean(FREETEXT_DETAILS_KEY)))
-                    .setProspectorDetails(new ProspectorDetails(Optional.<Date>fromNullable(basicObj.getDate(PROSPECTOR_DETAILS_KEY))))
-                    .setJoinSelectivityDetails(new JoinSelectivityDetails(Optional.<Date>fromNullable(basicObj.getDate(JOIN_SELECTIVITY_DETAILS_KEY))))
-                    .build();
+            final RyaDetails.Builder builder = RyaDetails.builder()
+                .setRyaInstanceName(basicObj.getString(INSTANCE_KEY))
+                .setRyaVersion(basicObj.getString(VERSION_KEY))
+                .setEntityCentricIndexDetails(new EntityCentricIndexDetails(basicObj.getBoolean(ENTITY_DETAILS_KEY)))
+                //RYA-215            .setGeoIndexDetails(new GeoIndexDetails(basicObj.getBoolean(GEO_DETAILS_KEY)))
+                .setPCJIndexDetails(getPCJIndexDetails(basicObj))
+                .setTemporalIndexDetails(new TemporalIndexDetails(basicObj.getBoolean(TEMPORAL_DETAILS_KEY)))
+                .setFreeTextDetails(new FreeTextIndexDetails(basicObj.getBoolean(FREETEXT_DETAILS_KEY)))
+                .setProspectorDetails(new ProspectorDetails(Optional.<Date>fromNullable(basicObj.getDate(PROSPECTOR_DETAILS_KEY))))
+                .setJoinSelectivityDetails(new JoinSelectivityDetails(Optional.<Date>fromNullable(basicObj.getDate(JOIN_SELECTIVITY_DETAILS_KEY))));
+
+            // If the Rya Streams Details are present, then add them.
+            if(basicObj.containsField(RYA_STREAMS_DETAILS_KEY)) {
+                final BasicDBObject streamsObject = (BasicDBObject) basicObj.get(RYA_STREAMS_DETAILS_KEY);
+                final String hostname = streamsObject.getString(RYA_STREAMS_HOSTNAME_KEY);
+                final int port = streamsObject.getInt(RYA_STREAMS_PORT_KEY);
+                builder.setRyaStreamsDetails(new RyaStreamsDetails(hostname, port));
+            }
+
+            return builder.build();
+
         } catch(final Exception e) {
             throw new MalformedRyaDetailsException("Failed to make RyaDetail from Mongo Object, it is malformed.", e);
         }
@@ -213,14 +257,15 @@
     }
 
     /**
-     * Exception thrown when a MongoDB {@link DBObject} is malformed when attemptin
-     * to adapt it into a {@link RyaDetails}.
+     * Indicates a MongoDB {@link DBObject} was malformed when attempting
+     * to convert it into a {@link RyaDetails} object.
      */
     public static class MalformedRyaDetailsException extends Exception {
         private static final long serialVersionUID = 1L;
 
         /**
-         * Creates a new {@link MalformedRyaDetailsException}
+         * Creates a new {@link MalformedRyaDetailsException}.
+         *
          * @param message - The message to be displayed by the exception.
          * @param e - The source cause of the exception.
          */
@@ -228,4 +273,4 @@
             super(message, e);
         }
     }
-}
+}
\ No newline at end of file
diff --git a/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/instance/MongoDetailsAdapterTest.java b/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/instance/MongoDetailsAdapterTest.java
index 0ea9456..f5845c2 100644
--- a/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/instance/MongoDetailsAdapterTest.java
+++ b/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/instance/MongoDetailsAdapterTest.java
@@ -32,6 +32,7 @@
 import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails;
 import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails.PCJUpdateStrategy;
 import org.apache.rya.api.instance.RyaDetails.ProspectorDetails;
+import org.apache.rya.api.instance.RyaDetails.RyaStreamsDetails;
 import org.apache.rya.api.instance.RyaDetails.TemporalIndexDetails;
 import org.apache.rya.mongodb.instance.MongoDetailsAdapter.MalformedRyaDetailsException;
 import org.junit.Test;
@@ -72,6 +73,7 @@
                 .setFreeTextDetails(new FreeTextIndexDetails(true))
                 .setProspectorDetails(new ProspectorDetails(Optional.fromNullable(new Date(0L))))
                 .setJoinSelectivityDetails(new JoinSelectivityDetails(Optional.fromNullable(new Date(1L))))
+                .setRyaStreamsDetails(new RyaStreamsDetails("localhost", 6))
                 .build();
 
         final BasicDBObject actual = MongoDetailsAdapter.toDBObject(details);
@@ -100,7 +102,8 @@
                         + "temporalDetails : true,"
                         + "freeTextDetails : true,"
                         + "prospectorDetails : { $date : \"1970-01-01T00:00:00.000Z\"},"
-                        + "joinSelectivitiyDetails : { $date : \"1970-01-01T00:00:00.001Z\"}"
+                        + "joinSelectivitiyDetails : { $date : \"1970-01-01T00:00:00.001Z\"},"
+                        + "ryaStreamsDetails : { hostname : \"localhost\" , port : 6}"
                         + "}"
                 );
 
@@ -134,7 +137,8 @@
                         + "temporalDetails : true,"
                         + "freeTextDetails : true,"
                         + "prospectorDetails : { $date : \"1970-01-01T00:00:00.000Z\"},"
-                        + "joinSelectivitiyDetails : { $date : \"1970-01-01T00:00:00.001Z\"}"
+                        + "joinSelectivitiyDetails : { $date : \"1970-01-01T00:00:00.001Z\"},"
+                        + "ryaStreamsDetails : { hostname : \"localhost\" , port : 6}"
                         + "}"
                 );
 
@@ -163,6 +167,7 @@
                 .setFreeTextDetails(new FreeTextIndexDetails(true))
                 .setProspectorDetails(new ProspectorDetails(Optional.<Date>fromNullable(new Date(0L))))
                 .setJoinSelectivityDetails(new JoinSelectivityDetails(Optional.<Date>fromNullable(new Date(1L))))
+                .setRyaStreamsDetails(new RyaStreamsDetails("localhost", 6))
                 .build();
 
         assertEquals(expected, actual);
diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/SetRyaStreamsConfigurationBase.java b/extras/indexing/src/main/java/org/apache/rya/api/client/SetRyaStreamsConfigurationBase.java
new file mode 100644
index 0000000..92a4c44
--- /dev/null
+++ b/extras/indexing/src/main/java/org/apache/rya/api/client/SetRyaStreamsConfigurationBase.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.client;
+
+import static java.util.Objects.requireNonNull;
+
+import org.apache.rya.api.instance.RyaDetails;
+import org.apache.rya.api.instance.RyaDetails.RyaStreamsDetails;
+import org.apache.rya.api.instance.RyaDetailsRepository;
+import org.apache.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException;
+import org.apache.rya.api.instance.RyaDetailsUpdater;
+import org.apache.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator.CouldNotApplyMutationException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A base class that implements the core functionality of the {@link SetRyaStreamsConfiguration} interactor.
+ * Subclasses just need to implement {@link #getRyaDetailsRepo(String)} so that the common code may update
+ * any implementation of that repository.
+ */
+@DefaultAnnotation(NonNull.class)
+public abstract class SetRyaStreamsConfigurationBase implements SetRyaStreamsConfiguration {
+
+    private final InstanceExists instanceExists;
+
+    /**
+     * Constructs an instance of {@link SetRyaStreamsConfigurationBase}.
+     *
+     * @param instanceExists - The interactor used to verify Rya instances exist. (not null)
+     */
+    public SetRyaStreamsConfigurationBase(final InstanceExists instanceExists) {
+        this.instanceExists = requireNonNull(instanceExists);
+    }
+
+    /**
+     * Get a {@link RyaDetailsRepository} that is connected to a specific instance of Rya.
+     *
+     * @param ryaInstance - The Rya instance the repository must be connected to. (not null)
+     * @return A {@link RyaDetailsRepository} connected to the specified Rya instance.
+     */
+    protected abstract RyaDetailsRepository getRyaDetailsRepo(String ryaInstance);
+
+    @Override
+    public void setRyaStreamsConfiguration(final String ryaInstance, final RyaStreamsDetails streamsDetails) throws InstanceDoesNotExistException, RyaClientException{
+        requireNonNull(ryaInstance);
+        requireNonNull(streamsDetails);
+
+        // Verify the Rya Instance exists.
+        if(!instanceExists.exists(ryaInstance)) {
+            throw new InstanceDoesNotExistException("There is no Rya instance named '" + ryaInstance + "' in this storage.");
+        }
+
+        // Update the old details object using the provided Rya Streams details.
+        final RyaDetailsRepository repo = getRyaDetailsRepo(ryaInstance);
+        try {
+            new RyaDetailsUpdater(repo).update(oldDetails -> {
+                final RyaDetails.Builder builder = RyaDetails.builder(oldDetails);
+                builder.setRyaStreamsDetails(streamsDetails);
+                return builder.build();
+            });
+        } catch (CouldNotApplyMutationException | RyaDetailsRepositoryException e) {
+            throw new RyaClientException("Unable to update which Rya Streams subsystem is used by the '" +
+                    ryaInstance + "' Rya instance.", e);
+        }
+    }
+}
\ No newline at end of file
diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloRyaClientFactory.java b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloRyaClientFactory.java
index 17fddaa..fdabea9 100644
--- a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloRyaClientFactory.java
+++ b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloRyaClientFactory.java
@@ -23,6 +23,7 @@
 import java.util.Optional;
 
 import org.apache.accumulo.core.client.Connector;
+import org.apache.rya.api.client.InstanceExists;
 import org.apache.rya.api.client.RyaClient;
 
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
@@ -50,6 +51,8 @@
         requireNonNull(connector);
 
         // Build the RyaCommands option with the initialized commands.
+        final InstanceExists instanceExists = new AccumuloInstanceExists(connectionDetails, connector);
+
         return new RyaClient(
                 new AccumuloInstall(connectionDetails, connector),
                 new AccumuloCreatePCJ(connectionDetails, connector),
@@ -59,10 +62,11 @@
                 Optional.of(new AccumuloListIncrementalQueries(connectionDetails, connector)),
                 new AccumuloBatchUpdatePCJ(connectionDetails, connector),
                 new AccumuloGetInstanceDetails(connectionDetails, connector),
-                new AccumuloInstanceExists(connectionDetails, connector),
+                instanceExists,
                 new AccumuloListInstances(connectionDetails, connector),
                 Optional.of(new AccumuloAddUser(connectionDetails, connector)),
                 Optional.of(new AccumuloRemoveUser(connectionDetails, connector)),
+                new AccumuloSetRyaStreamsConfiguration(instanceExists, connector),
                 new AccumuloUninstall(connectionDetails, connector),
                 new AccumuloLoadStatements(connectionDetails, connector),
                 new AccumuloLoadStatementsFile(connectionDetails, connector),
diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloSetRyaStreamsConfiguration.java b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloSetRyaStreamsConfiguration.java
new file mode 100644
index 0000000..f19d9fb
--- /dev/null
+++ b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloSetRyaStreamsConfiguration.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.client.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.rya.accumulo.instance.AccumuloRyaInstanceDetailsRepository;
+import org.apache.rya.api.client.InstanceExists;
+import org.apache.rya.api.client.SetRyaStreamsConfiguration;
+import org.apache.rya.api.client.SetRyaStreamsConfigurationBase;
+import org.apache.rya.api.instance.RyaDetailsRepository;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * An Accumulo implementation of {@link SetRyaStreamsConfiguration}.
+ */
+@DefaultAnnotation(NonNull.class)
+public class AccumuloSetRyaStreamsConfiguration extends SetRyaStreamsConfigurationBase {
+
+    private final Connector connector;
+
+    /**
+     * Constructs an instance of {@link AccumuloSetRyaStreamsConfiguration}.
+     *
+     * @param instanceExists - The interactor used to verify Rya instances exist. (not null)
+     * @param connector - Provides programmatic access to the instance of Accumulo that hosts Rya instance. (not null)
+     */
+    public AccumuloSetRyaStreamsConfiguration(
+            final InstanceExists instanceExists,
+            final Connector connector) {
+        super(instanceExists);
+        this.connector = requireNonNull(connector);
+    }
+
+    @Override
+    protected RyaDetailsRepository getRyaDetailsRepo(final String ryaInstance) {
+        requireNonNull(ryaInstance);
+        return new AccumuloRyaInstanceDetailsRepository(connector, ryaInstance);
+    }
+}
\ No newline at end of file
diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/mongo/MongoRyaClientFactory.java b/extras/indexing/src/main/java/org/apache/rya/api/client/mongo/MongoRyaClientFactory.java
index fbbec2a..5fa4877 100644
--- a/extras/indexing/src/main/java/org/apache/rya/api/client/mongo/MongoRyaClientFactory.java
+++ b/extras/indexing/src/main/java/org/apache/rya/api/client/mongo/MongoRyaClientFactory.java
@@ -66,6 +66,7 @@
                 new MongoListInstances(adminClient),
                 Optional.empty(),
                 Optional.empty(),
+                new MongoSetRyaStreamsConfiguration(instanceExists, adminClient),
                 new MongoUninstall(adminClient, instanceExists),
                 new MongoLoadStatements(connectionDetails, instanceExists),
                 new MongoLoadStatementsFile(connectionDetails, instanceExists),
diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/mongo/MongoSetRyaStreamsConfiguration.java b/extras/indexing/src/main/java/org/apache/rya/api/client/mongo/MongoSetRyaStreamsConfiguration.java
new file mode 100644
index 0000000..592e663
--- /dev/null
+++ b/extras/indexing/src/main/java/org/apache/rya/api/client/mongo/MongoSetRyaStreamsConfiguration.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.client.mongo;
+
+import static java.util.Objects.requireNonNull;
+
+import org.apache.rya.api.client.InstanceExists;
+import org.apache.rya.api.client.SetRyaStreamsConfiguration;
+import org.apache.rya.api.client.SetRyaStreamsConfigurationBase;
+import org.apache.rya.api.instance.RyaDetailsRepository;
+import org.apache.rya.mongodb.instance.MongoRyaInstanceDetailsRepository;
+
+import com.mongodb.MongoClient;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A MongoDB implementation of {@link SetRyaStreamsConfiguration}.
+ */
+@DefaultAnnotation(NonNull.class)
+public class MongoSetRyaStreamsConfiguration extends SetRyaStreamsConfigurationBase {
+
+    private final MongoClient client;
+
+    /**
+     * Constructs an instance of {@link MongoSetRyaStreamsConfiguration}.
+     *
+     * @param instanceExists - The interactor used to verify Rya instances exist. (not null)
+     * @param client - The MongoDB client used to connect to the Rya storage. (not null)
+     */
+    public MongoSetRyaStreamsConfiguration(
+            final InstanceExists instanceExists,
+            final MongoClient client) {
+        super(instanceExists);
+        this.client = requireNonNull(client);
+    }
+
+    @Override
+    protected RyaDetailsRepository getRyaDetailsRepo(final String ryaInstance) {
+        requireNonNull(ryaInstance);
+        return new MongoRyaInstanceDetailsRepository(client, ryaInstance);
+    }
+}
\ No newline at end of file
diff --git a/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloSetRyaStreamsConfigurationIT.java b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloSetRyaStreamsConfigurationIT.java
new file mode 100644
index 0000000..928a29e
--- /dev/null
+++ b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloSetRyaStreamsConfigurationIT.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.client.accumulo;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import org.apache.rya.accumulo.AccumuloITBase;
+import org.apache.rya.api.client.Install;
+import org.apache.rya.api.client.Install.InstallConfiguration;
+import org.apache.rya.api.client.InstanceDoesNotExistException;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.instance.RyaDetails.RyaStreamsDetails;
+import org.junit.Test;
+
+/**
+ * Integration tests the methods of {@link AccumuloSetRyaStreamsConfiguration}.
+ */
+public class AccumuloSetRyaStreamsConfigurationIT extends AccumuloITBase {
+
+    @Test(expected = InstanceDoesNotExistException.class)
+    public void instanceDoesNotExist() throws Exception {
+        final String ryaInstance = getRyaInstanceName();
+        final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails(
+                getUsername(),
+                getPassword().toCharArray(),
+                getInstanceName(),
+                getZookeepers());
+        final RyaClient ryaClient = AccumuloRyaClientFactory.build(connectionDetails, getConnector());
+
+        // Skip the install step to create error causing situation.
+        final RyaStreamsDetails details = new RyaStreamsDetails("localhost", 6);
+        ryaClient.getSetRyaStreamsConfiguration().setRyaStreamsConfiguration(ryaInstance, details);
+    }
+
+    @Test
+    public void updatesRyaDetails() throws Exception {
+        final String ryaInstance = getRyaInstanceName();
+        final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails(
+                getUsername(),
+                getPassword().toCharArray(),
+                getInstanceName(),
+                getZookeepers());
+        final RyaClient ryaClient = AccumuloRyaClientFactory.build(connectionDetails, getConnector());
+
+        // Install an instance of Rya.
+        final Install installRya = ryaClient.getInstall();
+        final InstallConfiguration installConf = InstallConfiguration.builder()
+                .build();
+        installRya.install(ryaInstance, installConf);
+
+        // Fetch its details and show they do not have any RyaStreamsDetails.
+        com.google.common.base.Optional<RyaStreamsDetails> streamsDetails =
+                ryaClient.getGetInstanceDetails().getDetails(ryaInstance).get().getRyaStreamsDetails();
+        assertFalse(streamsDetails.isPresent());
+
+        // Set the details.
+        final RyaStreamsDetails details = new RyaStreamsDetails("localhost", 6);
+        ryaClient.getSetRyaStreamsConfiguration().setRyaStreamsConfiguration(ryaInstance, details);
+
+        // Fetch its details again and show that they are now filled in.
+        streamsDetails = ryaClient.getGetInstanceDetails().getDetails(ryaInstance).get().getRyaStreamsDetails();
+        assertEquals(details, streamsDetails.get());
+    }
+}
\ No newline at end of file
diff --git a/extras/indexing/src/test/java/org/apache/rya/api/client/mongo/MongoSetRyaStreamsConfigurationIT.java b/extras/indexing/src/test/java/org/apache/rya/api/client/mongo/MongoSetRyaStreamsConfigurationIT.java
new file mode 100644
index 0000000..5fea578
--- /dev/null
+++ b/extras/indexing/src/test/java/org/apache/rya/api/client/mongo/MongoSetRyaStreamsConfigurationIT.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.client.mongo;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import java.util.Optional;
+
+import org.apache.rya.api.client.Install;
+import org.apache.rya.api.client.Install.InstallConfiguration;
+import org.apache.rya.api.client.InstanceDoesNotExistException;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.instance.RyaDetails.RyaStreamsDetails;
+import org.apache.rya.mongodb.MongoITBase;
+import org.junit.Test;
+
+/**
+ * Integration tests the methods of {@link MongoSetRyaStreamsConfiguration}.
+ */
+public class MongoSetRyaStreamsConfigurationIT extends MongoITBase {
+
+    @Test(expected = InstanceDoesNotExistException.class)
+    public void instanceDoesNotExist() throws Exception {
+        final RyaClient ryaClient = MongoRyaClientFactory.build(getConnectionDetails(), getMongoClient());
+
+        // Skip the install step to create error causing situation.
+        final String ryaInstance = conf.getRyaInstanceName();
+        final RyaStreamsDetails details = new RyaStreamsDetails("localhost", 6);
+        ryaClient.getSetRyaStreamsConfiguration().setRyaStreamsConfiguration(ryaInstance, details);
+    }
+
+    @Test
+    public void updatesRyaDetails() throws Exception {
+        final RyaClient ryaClient = MongoRyaClientFactory.build(getConnectionDetails(), getMongoClient());
+
+        // Install an instance of Rya.
+        final String ryaInstance = conf.getRyaInstanceName();
+        final Install installRya = ryaClient.getInstall();
+        final InstallConfiguration installConf = InstallConfiguration.builder()
+                .build();
+        installRya.install(ryaInstance, installConf);
+
+        // Fetch its details and show they do not have any RyaStreamsDetails.
+        com.google.common.base.Optional<RyaStreamsDetails> streamsDetails =
+                ryaClient.getGetInstanceDetails().getDetails(ryaInstance).get().getRyaStreamsDetails();
+        assertFalse(streamsDetails.isPresent());
+
+        // Set the details.
+        final RyaStreamsDetails details = new RyaStreamsDetails("localhost", 6);
+        ryaClient.getSetRyaStreamsConfiguration().setRyaStreamsConfiguration(ryaInstance, details);
+
+        // Fetch its details again and show that they are now filled in.
+        streamsDetails = ryaClient.getGetInstanceDetails().getDetails(ryaInstance).get().getRyaStreamsDetails();
+        assertEquals(details, streamsDetails.get());
+    }
+
+    private MongoConnectionDetails getConnectionDetails() {
+        final Optional<char[]> password = conf.getMongoPassword() != null ?
+                Optional.of(conf.getMongoPassword().toCharArray()) :
+                Optional.empty();
+
+        return new MongoConnectionDetails(
+                conf.getMongoHostname(),
+                Integer.parseInt(conf.getMongoPort()),
+                Optional.ofNullable(conf.getMongoUser()),
+                password);
+    }
+}
\ No newline at end of file
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/RyaStreamsClient.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/RyaStreamsClient.java
new file mode 100644
index 0000000..ee86e41
--- /dev/null
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/RyaStreamsClient.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.api;
+
+import static java.util.Objects.requireNonNull;
+
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.api.model.VisibilityStatement;
+import org.apache.rya.streams.api.interactor.AddQuery;
+import org.apache.rya.streams.api.interactor.DeleteQuery;
+import org.apache.rya.streams.api.interactor.GetQuery;
+import org.apache.rya.streams.api.interactor.GetQueryResultStream;
+import org.apache.rya.streams.api.interactor.ListQueries;
+import org.apache.rya.streams.api.interactor.StartQuery;
+import org.apache.rya.streams.api.interactor.StopQuery;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Provides access to a set of Rya Streams functions.Statement
+ */
+@DefaultAnnotation(NonNull.class)
+public class RyaStreamsClient implements AutoCloseable {
+
+    private final AddQuery addQuery;
+    private final GetQuery getQuery;
+    private final DeleteQuery deleteQuery;
+    private final GetQueryResultStream<VisibilityStatement> getStatementResultStream;
+    private final GetQueryResultStream<VisibilityBindingSet> getBindingSetResultStream;
+    private final ListQueries listQueries;
+    private final StartQuery startQuery;
+    private final StopQuery stopQuery;
+
+    /**
+     * Constructs an instance of {@link RyaStreamsClient}.
+     */
+    public RyaStreamsClient(
+            final AddQuery addQuery,
+            final GetQuery getQuery,
+            final DeleteQuery deleteQuery,
+            final GetQueryResultStream<VisibilityStatement> getStatementResultStream,
+            final GetQueryResultStream<VisibilityBindingSet> getBindingSetResultStream,
+            final ListQueries listQueries,
+            final StartQuery startQuery,
+            final StopQuery stopQuery) {
+        this.addQuery = requireNonNull(addQuery);
+        this.getQuery = requireNonNull(getQuery);
+        this.deleteQuery = requireNonNull(deleteQuery);
+        this.getStatementResultStream = requireNonNull(getStatementResultStream);
+        this.getBindingSetResultStream = requireNonNull(getBindingSetResultStream);
+        this.listQueries = requireNonNull(listQueries);
+        this.startQuery = requireNonNull(startQuery);
+        this.stopQuery = requireNonNull(stopQuery);
+    }
+
+    /**
+     * @return The connected {@link AddQuery} interactor.
+     */
+    public AddQuery getAddQuery() {
+        return addQuery;
+    }
+
+    /**
+     * @return The connected {@link GetQuery} interactor.
+     */
+    public GetQuery getGetQuery() {
+        return getQuery;
+    }
+
+    /**
+     * @return The connected {@link DeleteQuery} interactor.
+     */
+    public DeleteQuery getDeleteQuery() {
+        return deleteQuery;
+    }
+
+    /**
+     * @return The connected {@link GetQueryResultStream} interactor for a query that produces
+     *   {@link VisibilityStatement}s.
+     */
+    public GetQueryResultStream<VisibilityStatement> getGetStatementResultStream() {
+        return getStatementResultStream;
+    }
+
+    /**
+     * @return The connected {@link GetQueryResultStream} interactor for a query that produces
+     *   {@link VisibilityBindingSet}s.
+     */
+    public GetQueryResultStream<VisibilityBindingSet>  getGetBindingSetResultStream() {
+        return getBindingSetResultStream;
+    }
+
+    /**
+     * @return The connected {@link ListQueries} interactor.
+     */
+    public ListQueries getListQueries() {
+        return listQueries;
+    }
+
+    /**
+     * @return The connected {@link StartQuery} interactor.
+     */
+    public StartQuery getStartQuery() {
+        return startQuery;
+    }
+
+    /**
+     * @return The connected {@link StopQuery} interactor.
+     */
+    public StopQuery getStopQuery() {
+        return stopQuery;
+    }
+
+    /**
+     * By defualt, this client doesn't close anything. If an implementation of the client
+     * requires closing components, then override this method.
+     */
+    @Override
+    public void close() throws Exception { }
+}
\ No newline at end of file
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/GetQuery.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/GetQuery.java
new file mode 100644
index 0000000..1293714
--- /dev/null
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/GetQuery.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.api.interactor;
+
+import java.util.Optional;
+import java.util.UUID;
+
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.api.exception.RyaStreamsException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Get a {@link StreamsQuery} from Rya Streams.
+ */
+@DefaultAnnotation(NonNull.class)
+public interface GetQuery {
+
+    /**
+     * Get a {@link StreamsQuery} from Rya Streams.
+     *
+     * @param queryId - Identifies the query to fetch. (not null)
+     * @return The {@link StreamsQuery} for the {@code queryId}; if one is stored for the ID.
+     * @throws RyaStreamsException The query could not be fetched.
+     */
+    public Optional<StreamsQuery> getQuery(UUID queryId) throws RyaStreamsException;
+}
\ No newline at end of file
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultGetQuery.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultGetQuery.java
new file mode 100644
index 0000000..14b93ab
--- /dev/null
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultGetQuery.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.api.interactor.defaults;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Optional;
+import java.util.UUID;
+
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.api.exception.RyaStreamsException;
+import org.apache.rya.streams.api.interactor.GetQuery;
+import org.apache.rya.streams.api.queries.QueryRepository;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Get a {@link StreamsQuery} from Rya Streams.
+ */
+@DefaultAnnotation(NonNull.class)
+public class DefaultGetQuery implements GetQuery {
+    private final QueryRepository repository;
+
+    /**
+     * Constructs an instance of {@link DefaultGetQuery}.
+     *
+     * @param repository - The {@link QueryRepository} to get queries from. (not null)
+     */
+    public DefaultGetQuery(final QueryRepository repository) {
+        this.repository = requireNonNull(repository);
+    }
+
+    @Override
+    public Optional<StreamsQuery> getQuery(final UUID queryId) throws RyaStreamsException {
+        requireNonNull(queryId);
+        return repository.get(queryId);
+    }
+}
\ No newline at end of file
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaRyaStreamsClientFactory.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaRyaStreamsClientFactory.java
new file mode 100644
index 0000000..9250d9d
--- /dev/null
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaRyaStreamsClientFactory.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Properties;
+import java.util.UUID;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.rya.streams.api.RyaStreamsClient;
+import org.apache.rya.streams.api.interactor.defaults.DefaultAddQuery;
+import org.apache.rya.streams.api.interactor.defaults.DefaultDeleteQuery;
+import org.apache.rya.streams.api.interactor.defaults.DefaultGetQuery;
+import org.apache.rya.streams.api.interactor.defaults.DefaultListQueries;
+import org.apache.rya.streams.api.interactor.defaults.DefaultStartQuery;
+import org.apache.rya.streams.api.interactor.defaults.DefaultStopQuery;
+import org.apache.rya.streams.api.queries.InMemoryQueryRepository;
+import org.apache.rya.streams.api.queries.QueryChange;
+import org.apache.rya.streams.api.queries.QueryChangeLog;
+import org.apache.rya.streams.api.queries.QueryRepository;
+import org.apache.rya.streams.kafka.interactor.KafkaGetQueryResultStream;
+import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog;
+import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
+import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer;
+import org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserializer;
+import org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Constructs instances of {@link RyaStreamsClient} that are connected to a Kafka cluster.
+ */
+@DefaultAnnotation(NonNull.class)
+public final class KafkaRyaStreamsClientFactory {
+    private static final Logger log = LoggerFactory.getLogger(KafkaRyaStreamsClientFactory.class);
+
+    /**
+     * Initialize a {@link RyaStreamsClient} that will interact with an instance of Rya Streams
+     * that is backed by Kafka.
+     *
+     * @param ryaInstance - The name of the Rya Instance the client is connected to. (not null)
+     * @param kafkaHostname - The hostname of the Kafka Broker.
+     * @param kafkaPort - The port of the Kafka Broker.
+     * @return The initialized commands.
+     */
+    public static RyaStreamsClient make(
+            final String ryaInstance,
+            final String kafkaHostname,
+            final int kafkaPort) {
+        requireNonNull(ryaInstance);
+        requireNonNull(kafkaHostname);
+
+        // Setup Query Repository used by the Kafka Rya Streams subsystem.
+        final Producer<?, QueryChange> queryProducer =
+                makeProducer(kafkaHostname, kafkaPort, StringSerializer.class, QueryChangeSerializer.class);
+        final Consumer<?, QueryChange>queryConsumer =
+                fromStartConsumer(kafkaHostname, kafkaPort, StringDeserializer.class, QueryChangeDeserializer.class);
+        final String changeLogTopic = KafkaTopics.queryChangeLogTopic(ryaInstance);
+        final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, changeLogTopic);
+        final QueryRepository queryRepo = new InMemoryQueryRepository(changeLog);
+
+        // Create the Rya Streams client that is backed by a Kafka Query Change Log.
+        return new RyaStreamsClient(
+                new DefaultAddQuery(queryRepo),
+                new DefaultGetQuery(queryRepo),
+                new DefaultDeleteQuery(queryRepo),
+                new KafkaGetQueryResultStream<>(kafkaHostname, "" + kafkaPort, VisibilityStatementDeserializer.class),
+                new KafkaGetQueryResultStream<>(kafkaHostname, "" + kafkaPort, VisibilityBindingSetDeserializer.class),
+                new DefaultListQueries(queryRepo),
+                new DefaultStartQuery(queryRepo),
+                new DefaultStopQuery(queryRepo)) {
+
+            /**
+             * Close the QueryRepository used by the returned client.
+             */
+            @Override
+            public void close() {
+                try {
+                    queryRepo.close();
+                } catch (final Exception e) {
+                    log.warn("Couldn't close a QueryRepository.", e);
+                }
+            }
+        };
+    }
+
+    /**
+     * Create a {@link Producer} that is able to write to a topic in Kafka.
+     *
+     * @param kafkaHostname - The Kafka broker hostname. (not null)
+     * @param kafkaPort - The Kafka broker port.
+     * @param keySerializerClass - Serializes the keys. (not null)
+     * @param valueSerializerClass - Serializes the values. (not null)
+     * @return A {@link Producer} that can be used to write records to a topic.
+     */
+    private static <K, V> Producer<K, V> makeProducer(
+            final String kafkaHostname,
+            final int kakfaPort,
+            final Class<? extends Serializer<K>> keySerializerClass,
+            final Class<? extends Serializer<V>> valueSerializerClass) {
+        requireNonNull(kafkaHostname);
+        requireNonNull(keySerializerClass);
+        requireNonNull(valueSerializerClass);
+
+        final Properties producerProps = new Properties();
+        producerProps.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaHostname + ":" + kakfaPort);
+        producerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass.getName());
+        producerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass.getName());
+        return new KafkaProducer<>(producerProps);
+    }
+
+    /**
+     * Create a {@link Consumer} that has a unique group ID and reads everything from a topic in Kafka
+     * starting at the earliest point by default.
+     *
+     * @param kafkaHostname - The Kafka broker hostname. (not null)
+     * @param kafkaPort - The Kafka broker port.
+     * @param keyDeserializerClass - Deserializes the keys. (not null)
+     * @param valueDeserializerClass - Deserializes the values. (not null)
+     * @return A {@link Consumer} that can be used to read records from a topic.
+     */
+    private static <K, V> Consumer<K, V> fromStartConsumer(
+            final String kafkaHostname,
+            final int kakfaPort,
+            final Class<? extends Deserializer<K>> keyDeserializerClass,
+            final Class<? extends Deserializer<V>> valueDeserializerClass) {
+        requireNonNull(kafkaHostname);
+        requireNonNull(keyDeserializerClass);
+        requireNonNull(valueDeserializerClass);
+
+        final Properties consumerProps = new Properties();
+        consumerProps.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaHostname + ":" + kakfaPort);
+        consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
+        consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString());
+        consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializerClass.getName());
+        consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClass.getName());
+        return new KafkaConsumer<>(consumerProps);
+    }
+}
\ No newline at end of file
diff --git a/extras/shell/pom.xml b/extras/shell/pom.xml
index 1a07400..fcca909 100644
--- a/extras/shell/pom.xml
+++ b/extras/shell/pom.xml
@@ -59,6 +59,10 @@
             <artifactId>rya.pcj.fluo.api</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.streams.kafka</artifactId>
+        </dependency>
+        <dependency>
             <groupId>org.apache.fluo</groupId>
             <artifactId>fluo-core</artifactId>
         </dependency>
diff --git a/extras/shell/src/main/java/org/apache/rya/shell/RyaConnectionCommands.java b/extras/shell/src/main/java/org/apache/rya/shell/RyaConnectionCommands.java
index b6fddb0..b4168af 100644
--- a/extras/shell/src/main/java/org/apache/rya/shell/RyaConnectionCommands.java
+++ b/extras/shell/src/main/java/org/apache/rya/shell/RyaConnectionCommands.java
@@ -34,10 +34,15 @@
 import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
 import org.apache.rya.api.client.mongo.MongoConnectionDetails;
 import org.apache.rya.api.client.mongo.MongoRyaClientFactory;
+import org.apache.rya.api.instance.RyaDetails;
+import org.apache.rya.api.instance.RyaDetails.RyaStreamsDetails;
 import org.apache.rya.shell.SharedShellState.ConnectionState;
+import org.apache.rya.shell.SharedShellState.ShellState;
 import org.apache.rya.shell.SharedShellState.StorageType;
 import org.apache.rya.shell.util.ConnectorFactory;
 import org.apache.rya.shell.util.PasswordPrompt;
+import org.apache.rya.streams.api.RyaStreamsClient;
+import org.apache.rya.streams.kafka.KafkaRyaStreamsClientFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.shell.core.CommandMarker;
 import org.springframework.shell.core.annotation.CliAvailabilityIndicator;
@@ -222,31 +227,58 @@
     @CliCommand(value = CONNECT_INSTANCE_CMD, help = "Connect to a specific Rya instance")
     public void connectToInstance(
             @CliOption(key = {"instance"}, mandatory = true, help = "The name of the Rya instance the shell will interact with.")
-            final String instance) {
+            final String ryaInstance) {
+        final RyaClient ryaClient = sharedState.getShellState().getConnectedCommands().get();
         try {
-            final InstanceExists instanceExists = sharedState.getShellState().getConnectedCommands().get().getInstanceExists();
+            final InstanceExists instanceExists = ryaClient.getInstanceExists();
 
             // Make sure the requested instance exists.
-            if(!instanceExists.exists(instance)) {
-                throw new RuntimeException(String.format("'%s' does not match an existing Rya instance.", instance));
+            if(!instanceExists.exists(ryaInstance)) {
+                throw new RuntimeException(String.format("'%s' does not match an existing Rya instance.", ryaInstance));
+            }
+
+            // Store the instance name in the shared state.
+            sharedState.connectedToInstance(ryaInstance);
+
+            // If the Rya instance is configured to interact with Rya Streams, then connect the
+            // Rya Streams client to the shared state.
+            final com.google.common.base.Optional<RyaDetails> ryaDetails = ryaClient.getGetInstanceDetails().getDetails(ryaInstance);
+            if(ryaDetails.isPresent()) {
+                final com.google.common.base.Optional<RyaStreamsDetails> streamsDetails = ryaDetails.get().getRyaStreamsDetails();
+                if(streamsDetails.isPresent()) {
+                    final String kafkaHostname = streamsDetails.get().getHostname();
+                    final int kafkaPort = streamsDetails.get().getPort();
+                    final RyaStreamsClient streamsClient = KafkaRyaStreamsClientFactory.make(ryaInstance, kafkaHostname, kafkaPort);
+                    sharedState.connectedToRyaStreams(streamsClient);
+                }
             }
         } catch(final RyaClientException e) {
             throw new RuntimeException("Could not connect to Rya instance. Reason: " + e.getMessage(), e);
         }
-
-        // Store the instance name in the shared state.
-        sharedState.connectedToInstance(instance);
     }
 
     @CliCommand(value = DISCONNECT_COMMAND_NAME_CMD, help = "Disconnect the shell's Rya storage connection (Accumulo).")
     public void disconnect() {
+        final ShellState shellState = sharedState.getShellState();
+
         // If connected to Mongo, there is a client that needs to be closed.
-        final com.google.common.base.Optional<MongoClient> mongoAdminClient = sharedState.getShellState().getMongoAdminClient();
+        final com.google.common.base.Optional<MongoClient> mongoAdminClient = shellState.getMongoAdminClient();
         if(mongoAdminClient.isPresent()) {
             mongoAdminClient.get().close();
         }
 
+        // If connected to Rya Streams, then close the associated resources.
+        final com.google.common.base.Optional<RyaStreamsClient> streamsClient = shellState.getRyaStreamsCommands();
+        if(streamsClient.isPresent()) {
+            try {
+                streamsClient.get().close();
+            } catch (final Exception e) {
+                System.err.print("Could not close the RyaStreamsClient.");
+                e.printStackTrace();
+            }
+        }
+
         // Update the shared state to disconnected.
         sharedState.disconnected();
     }
-}
+}
\ No newline at end of file
diff --git a/extras/shell/src/main/java/org/apache/rya/shell/RyaStreamsCommands.java b/extras/shell/src/main/java/org/apache/rya/shell/RyaStreamsCommands.java
new file mode 100644
index 0000000..5f7df84
--- /dev/null
+++ b/extras/shell/src/main/java/org/apache/rya/shell/RyaStreamsCommands.java
@@ -0,0 +1,297 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.shell;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.instance.RyaDetails;
+import org.apache.rya.api.instance.RyaDetails.RyaStreamsDetails;
+import org.apache.rya.shell.SharedShellState.ConnectionState;
+import org.apache.rya.shell.util.SparqlPrompt;
+import org.apache.rya.shell.util.StreamsQueryFormatter;
+import org.apache.rya.streams.api.RyaStreamsClient;
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.api.exception.RyaStreamsException;
+import org.apache.rya.streams.kafka.KafkaRyaStreamsClientFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.shell.core.CommandMarker;
+import org.springframework.shell.core.annotation.CliAvailabilityIndicator;
+import org.springframework.shell.core.annotation.CliCommand;
+import org.springframework.shell.core.annotation.CliOption;
+import org.springframework.stereotype.Component;
+
+import com.google.common.base.Optional;
+
+/**
+ * Rya Shell commands used to interact with the Rya Streams subsystem.
+ */
+@Component
+public class RyaStreamsCommands implements CommandMarker {
+
+    public static final String STREAMS_CONFIGURE_CMD = "streams-configure";
+    public static final String STREAMS_DETAILS_CMD = "streams-details";
+    public static final String STREAM_QUERIES_ADD_CMD = "streams-queries-add";
+    public static final String STREAM_QUERIES_DELETE_CMD = "streams-queries-delete";
+    public static final String STREAM_QUERIES_START_CMD = "streams-queries-start";
+    public static final String STREAM_QUERIES_STOP_CMD = "streams-queries-stop";
+    public static final String STREAM_QUERIES_LIST_CMD = "streams-queries-list";
+    public static final String STREAM_QUERIES_DETAILS_CMD = "streams-queries-details";
+
+    private final SharedShellState state;
+    private final SparqlPrompt sparqlPrompt;
+
+    /**
+     * Constructs an instance of {@link RyaStreamsCommands}.
+     *
+     * @param state - Holds shared state between all of the command classes. (not null)
+     * @param sparqlPrompt - Prompts a user for a SPARQL query. (not null)
+     */
+    @Autowired
+    public RyaStreamsCommands(
+            final SharedShellState state,
+            final SparqlPrompt sparqlPrompt) {
+        this.state = requireNonNull(state);
+        this.sparqlPrompt = requireNonNull(sparqlPrompt);
+    }
+
+    /**
+     * Enables commands that become available when connected to a Rya instance.
+     */
+    @CliAvailabilityIndicator({
+        STREAMS_CONFIGURE_CMD,
+        STREAMS_DETAILS_CMD})
+    public boolean areConfigCommandsAvailable() {
+        return state.getShellState().getConnectionState() == ConnectionState.CONNECTED_TO_INSTANCE;
+    }
+
+    /**
+     * Enables commands that become available when a Rya instance has a configured Rya Streams subsystem to use.
+     */
+    @CliAvailabilityIndicator({
+        STREAM_QUERIES_ADD_CMD,
+        STREAM_QUERIES_DELETE_CMD,
+        STREAM_QUERIES_START_CMD,
+        STREAM_QUERIES_STOP_CMD,
+        STREAM_QUERIES_LIST_CMD,
+        STREAM_QUERIES_DETAILS_CMD})
+    public boolean areQueriesCommandsAvailable() {
+        return state.getShellState().getRyaStreamsCommands().isPresent();
+    }
+
+    @CliCommand(value = STREAMS_CONFIGURE_CMD, help = "Connect a Rya Streams subsystem to a Rya Instance.")
+    public String configureRyaStreams(
+            @CliOption(key = {"kafkaHostname"}, mandatory = true, help = "The hostname of the Kafka Broker.")
+            final String kafkaHostname,
+            @CliOption(key = {"kafkaPort"}, mandatory = true, help = "The port of the Kafka Broker.")
+            final int kafkaPort) {
+
+        // If this instance was connected to a different Rya Streams subsystem, then close that client.
+        final Optional<RyaStreamsClient> oldClient = state.getShellState().getRyaStreamsCommands();
+        if(oldClient.isPresent()) {
+            try {
+                oldClient.get().close();
+            } catch (final Exception e) {
+                System.err.print("Warning: Could not close the old Rya Streams Client.");
+                e.printStackTrace();
+            }
+        }
+
+        // Update the Rya Details for the connected Rya Instance.
+        final String ryaInstance = state.getShellState().getRyaInstanceName().get();
+        final RyaClient ryaClient = state.getShellState().getConnectedCommands().get();
+        try {
+            final RyaStreamsDetails streamsDetails = new RyaStreamsDetails(kafkaHostname, kafkaPort);
+            ryaClient.getSetRyaStreamsConfiguration().setRyaStreamsConfiguration(ryaInstance, streamsDetails);
+        } catch (final RyaClientException e) {
+            throw new RuntimeException("Could not update the Rya instance's Rya Details to include the new " +
+                    "information. This command failed to complete.", e);
+        }
+
+        // Connect a Rya Streams Client and set it in the shared state.
+        final RyaStreamsClient newClient = KafkaRyaStreamsClientFactory.make(ryaInstance, kafkaHostname, kafkaPort);
+        state.connectedToRyaStreams(newClient);
+
+        // Return a message that indicates the operation was successful.
+        if(oldClient.isPresent()) {
+            return "The Rya Streams subsystem that this Rya instance uses has been changed. Any queries that were " +
+                    "maintained by the previous subsystem will need to be migrated to the new one.";
+        } else {
+            return "The Rya Instance has been updated to use the provided Rya Streams subsystem. " +
+                    "Rya Streams commands are now avaiable while connected to this instance.";
+        }
+    }
+
+    @CliCommand(value = STREAMS_DETAILS_CMD, help = "Print information about which Rya Streams subsystem the Rya instance is connected to.")
+    public String printRyaStreamsDetails() {
+        final String ryaInstance = state.getShellState().getRyaInstanceName().get();
+        final RyaClient client = state.getShellState().getConnectedCommands().get();
+        try {
+            // Handle the case where the instance does not have Rya Details.
+            final Optional<RyaDetails> details = client.getGetInstanceDetails().getDetails(ryaInstance);
+            if(!details.isPresent()) {
+                return "This instance does not have any Rya Details, so it is unable to be connected to the Rya Streams subsystem.";
+            }
+
+            // Print a message based on if the instance is connected to Rya Streams.
+            final Optional<RyaStreamsDetails> streamsDetails = details.get().getRyaStreamsDetails();
+            if(!streamsDetails.isPresent()) {
+                return "This instance of Rya has not been configured to use a Rya Streams subsystem.";
+            }
+
+            // Print the details about which Rya Streams subsystem is being used.
+            return "Kafka Hostname: " + streamsDetails.get().getHostname() + ", Kafka Port: " + streamsDetails.get().getPort();
+
+        } catch (final RyaClientException e) {
+            throw new RuntimeException("Could not fetch the Rya Details for this Rya instance.", e);
+        }
+    }
+
+    @CliCommand(value = STREAM_QUERIES_ADD_CMD, help = "Add a SPARQL query to the Rya Streams subsystem.")
+    public String addQuery(
+            @CliOption(key = {"inactive"}, mandatory = false, unspecifiedDefaultValue = "false", specifiedDefaultValue = "true",
+                       help = "Setting this flag will add the query, but not run it. (default: false)")
+            final boolean inactive) {
+        final RyaStreamsClient streamsClient = state.getShellState().getRyaStreamsCommands().get();
+
+        // Prompt the user for the SPARQL that defines the query.
+        try {
+            final Optional<String> sparql = sparqlPrompt.getSparql();
+
+            // If the user aborted the prompt, return.
+            if(!sparql.isPresent()) {
+                return "";
+            }
+
+            final StreamsQuery streamsQuery = streamsClient.getAddQuery().addQuery(sparql.get(), !inactive);
+            return "The added query's ID is " + streamsQuery.getQueryId();
+
+        } catch (final IOException | RyaStreamsException e) {
+            throw new RuntimeException("Unable to add the SPARQL query to the Rya Streams subsystem.", e);
+        }
+    }
+
+    @CliCommand(value = STREAM_QUERIES_DELETE_CMD, help = "Delete a SPARQL query from the Rya Streams subsystem.")
+    public String deleteQuery(
+            @CliOption(key= {"queryId"}, mandatory = true, help = "The ID of the query to remove.")
+            final String queryId) {
+
+        final RyaStreamsClient streamsClient = state.getShellState().getRyaStreamsCommands().get();
+        final UUID id = UUID.fromString(queryId);
+        try {
+            streamsClient.getDeleteQuery().delete(id);
+        } catch (final RyaStreamsException e) {
+            throw new RuntimeException("Could not delete the query from the Rya Streams subsystem.", e);
+        }
+        return "The query has been deleted.";
+    }
+
+    @CliCommand(value = STREAM_QUERIES_START_CMD, help = "Start processing a SPARQL query using the Rya Streams subsystem.")
+    public String startQuery(
+            @CliOption(key= {"queryId"}, mandatory = true, help = "The ID of the query to start processing.")
+            final String queryId) {
+        final RyaStreamsClient streamsClient = state.getShellState().getRyaStreamsCommands().get();
+
+        try {
+            // Ensure the query exists.
+            final UUID id = UUID.fromString(queryId);
+            final java.util.Optional<StreamsQuery> streamsQuery = streamsClient.getGetQuery().getQuery(id);
+            if(!streamsQuery.isPresent()) {
+                throw new RuntimeException("No Rya Streams query exists for ID " + queryId);
+            }
+
+            // Ensure it isn't already started.
+            if(streamsQuery.get().isActive()) {
+                return "That query is already running.";
+            }
+
+            // Start it.
+            streamsClient.getStartQuery().start(id);
+            return "The query will be processed by the Rya Streams subsystem.";
+
+        } catch (final RyaStreamsException e) {
+            throw new RuntimeException("Unable to start the Query.", e);
+        }
+    }
+
+    @CliCommand(value = STREAM_QUERIES_STOP_CMD, help = "Stop processing a SPARQL query using the Rya Streams subsystem.")
+    public String stopQuery(
+            @CliOption(key= {"queryId"}, mandatory = true, help = "The ID of the query to stop processing.")
+            final String queryId) {
+        final RyaStreamsClient streamsClient = state.getShellState().getRyaStreamsCommands().get();
+
+        try {
+            // Ensure the query exists.
+            final UUID id = UUID.fromString(queryId);
+            final java.util.Optional<StreamsQuery> streamsQuery = streamsClient.getGetQuery().getQuery(id);
+            if(!streamsQuery.isPresent()) {
+                throw new RuntimeException("No Rya Streams query exists for ID " + queryId);
+            }
+
+            // Ensure it isn't already stopped.
+            if(!streamsQuery.get().isActive()) {
+                return "That query is already stopped.";
+            }
+
+            // Stop it.
+            streamsClient.getStopQuery().stop(id);
+            return "The query will no longer be processed by the Rya Streams subsystem.";
+
+        } catch (final RyaStreamsException e) {
+            throw new RuntimeException("Unable to start the Query.", e);
+        }
+    }
+
+    @CliCommand(value = STREAM_QUERIES_LIST_CMD, help = "List the queries that are being managed by the configured Rya Streams subsystem.")
+    public String listQueries() {
+        final RyaStreamsClient streamsClient = state.getShellState().getRyaStreamsCommands().get();
+        try {
+            final Set<StreamsQuery> queries = streamsClient.getListQueries().all();
+            return StreamsQueryFormatter.format(queries);
+        } catch (final RyaStreamsException e) {
+            throw new RuntimeException("Unable to fetch the queries from the Rya Streams subsystem.", e);
+        } catch (final Exception e) {
+            throw new RuntimeException("Unable to print the query to the console.", e);
+        }
+    }
+
+    @CliCommand(value = STREAM_QUERIES_DETAILS_CMD, help = "Print detailed information about a specific query managed by the Rya Streams subsystem.")
+    public String printQueryDetails(
+            @CliOption(key= {"queryId"}, mandatory = true, help = "The ID of the query whose details will be printed.")
+            final String queryId) {
+        final RyaStreamsClient streamsClient = state.getShellState().getRyaStreamsCommands().get();
+        final UUID id = UUID.fromString(queryId);
+        try {
+            final java.util.Optional<StreamsQuery> query = streamsClient.getGetQuery().getQuery(id);
+            if(!query.isPresent()) {
+                return "There is no query with the specified ID.";
+            }
+            return StreamsQueryFormatter.format(query.get());
+        } catch (final RyaStreamsException e) {
+            throw new RuntimeException("Unable to fetch the query from the Rya Streams subsystem.", e);
+        } catch (final Exception e) {
+            throw new RuntimeException("Unable to print the query to the console.", e);
+        }
+    }
+}
\ No newline at end of file
diff --git a/extras/shell/src/main/java/org/apache/rya/shell/SharedShellState.java b/extras/shell/src/main/java/org/apache/rya/shell/SharedShellState.java
index bb22fd3..58bcfbe 100644
--- a/extras/shell/src/main/java/org/apache/rya/shell/SharedShellState.java
+++ b/extras/shell/src/main/java/org/apache/rya/shell/SharedShellState.java
@@ -26,6 +26,8 @@
 import org.apache.rya.api.client.RyaClient;
 import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
 import org.apache.rya.api.client.mongo.MongoConnectionDetails;
+import org.apache.rya.api.instance.RyaDetails;
+import org.apache.rya.streams.api.RyaStreamsClient;
 
 import com.google.common.base.Optional;
 import com.mongodb.MongoClient;
@@ -159,6 +161,34 @@
     }
 
     /**
+     * This method indicates a shift into a state where the shell may have to interact with the Rya Streams subsystem.
+     * <p/>
+     * Stores the {@link RyaStreamsClient} all Rya Streams commands will be executed against.
+     *
+     * @param ryaStreamsCommands - Rya Streams commands that will execute against the Rya Streams subsystem. (not null)
+     */
+    public void connectedToRyaStreams(
+            final RyaStreamsClient ryaStreamsCommands) {
+        requireNonNull(ryaStreamsCommands);
+
+        lock.lock();
+        try {
+            // Verify the Rya Shell is connected to an instance.
+            if(shellState.getConnectionState() != ConnectionState.CONNECTED_TO_INSTANCE) {
+                throw new IllegalStateException("You can not set the connected Rya Streams Client before connected to a Rya Instance.");
+            }
+
+            // Set the connected Rya Streams commands.
+            shellState = ShellState.builder( shellState )
+                    .setRyaStreamsCommands(ryaStreamsCommands)
+                    .build();
+
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
      * This method indicates a shift into the {@link DISCONNECTED} state.
      * <p/>
      * Clears all of the values associated with a Rya Storage/Instance connection.
@@ -225,6 +255,7 @@
         private final Optional<MongoConnectionDetails> mongoDetails;
         private final Optional<MongoClient> mongoAdminClient;
         private final Optional<RyaClient> connectedCommands;
+        private final Optional<RyaStreamsClient> ryaStreamsCommands;
 
         // Instance specific values.
         private final Optional<String> instanceName;
@@ -236,7 +267,8 @@
                 final Optional<MongoConnectionDetails> mongoDetails,
                 final Optional<MongoClient> mongoAdminClient,
                 final Optional<RyaClient> connectedCommands,
-                final Optional<String> instanceName) {
+                final Optional<String> instanceName,
+                final Optional<RyaStreamsClient> ryaStreamsCommands) {
             this.connectionState = requireNonNull(connectionState);
             this.storageType = requireNonNull(storageType);
             this.accumuloDetails = requireNonNull(accumuloDetails);
@@ -244,6 +276,7 @@
             this.mongoAdminClient = requireNonNull(mongoAdminClient);
             this.connectedCommands = requireNonNull(connectedCommands);
             this.instanceName = requireNonNull(instanceName);
+            this.ryaStreamsCommands = requireNonNull(ryaStreamsCommands);
         }
 
         /**
@@ -294,6 +327,15 @@
         }
 
         /**
+         * @return The {@link RyaStreamsClient} to use when a command on the shell is issued.
+         *   The value will not be present if the Rya Shell is not connected to an instance
+         *   whose {@link RyaDetails} indicate when Rya Streams system to use.
+         */
+        public Optional<RyaStreamsClient> getRyaStreamsCommands() {
+            return ryaStreamsCommands;
+        }
+
+        /**
          * @return The name of the Rya Instance the Rya Shell is issuing commands to.
          *   The value will not be present if the Rya Shell is not connected to a
          *   storage or if a target instance has not been set yet.
@@ -353,6 +395,7 @@
             private MongoConnectionDetails mongoDetails;
             private MongoClient mongoAdminClient;
             private RyaClient connectedCommands;
+            private RyaStreamsClient ryaStreamsCommands;
 
             // Instance specific values.
             private String instanceName;
@@ -375,6 +418,7 @@
                 this.mongoDetails = shellState.getMongoDetails().orNull();
                 this.mongoAdminClient = shellState.getMongoAdminClient().orNull();
                 this.connectedCommands = shellState.getConnectedCommands().orNull();
+                this.ryaStreamsCommands = shellState.getRyaStreamsCommands().orNull();
                 this.instanceName = shellState.getRyaInstanceName().orNull();
             }
 
@@ -435,6 +479,15 @@
             }
 
             /**
+             * @param ryaStreamsCommands - The {@link RyaStreamsClient} to use when a command on the shell is issued.
+             * @return This {@link Builder} so that method invocations may be chained.
+             */
+            public Builder setRyaStreamsCommands(@Nullable final RyaStreamsClient ryaStreamsCommands) {
+                this.ryaStreamsCommands = ryaStreamsCommands;
+                return this;
+            }
+
+            /**
              * @param instanceName - The name of the Rya Instance the Rya Shell is issuing commands to.
              * @return This {@link Builder} so that method invocations may be chained.
              */
@@ -454,7 +507,8 @@
                         Optional.fromNullable(mongoDetails),
                         Optional.fromNullable(mongoAdminClient),
                         Optional.fromNullable(connectedCommands),
-                        Optional.fromNullable(instanceName));
+                        Optional.fromNullable(instanceName),
+                        Optional.fromNullable(ryaStreamsCommands));
             }
         }
     }
diff --git a/extras/shell/src/main/java/org/apache/rya/shell/util/StreamsQueryFormatter.java b/extras/shell/src/main/java/org/apache/rya/shell/util/StreamsQueryFormatter.java
new file mode 100644
index 0000000..6c06caf
--- /dev/null
+++ b/extras/shell/src/main/java/org/apache/rya/shell/util/StreamsQueryFormatter.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.shell.util;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.openrdf.query.parser.ParsedQuery;
+import org.openrdf.query.parser.sparql.SPARQLParser;
+import org.openrdf.queryrender.sparql.SPARQLQueryRenderer;
+
+import com.google.common.collect.Lists;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Pretty formats {@link StreamsQuery} objects.
+ */
+@DefaultAnnotation(NonNull.class)
+public final class StreamsQueryFormatter {
+
+    /**
+     * Pretty formats a {@link StreamsQuery}.
+     *
+     * @param query - The query to format. (not null)
+     * @return The pretty formatted string.
+     * @throws Exception A problem was encountered while pretty formatting the SPARQL.
+     */
+    public static String format(final StreamsQuery query) throws Exception {
+        requireNonNull(query);
+
+        // Pretty format the SPARQL query.
+        final ParsedQuery parsedQuery = new SPARQLParser().parseQuery(query.getSparql(), null);
+        final String prettySparql = new SPARQLQueryRenderer().render(parsedQuery);
+        final String[] lines = prettySparql.split("\n");
+
+        // Create the formatted string.
+        query.getQueryId();
+        query.isActive();
+
+        String.format(" QueryId: %s", query.getQueryId());
+
+        final StringBuilder builder = new StringBuilder();
+        builder.append(" Query ID: ").append( query.getQueryId() ) .append("\n");
+        builder.append("Is Active: ").append( query.isActive() ).append("\n");
+        builder.append("   SPARQL: ").append( lines[0] ).append("\n");
+
+        for(int i = 1; i < lines.length; i++) {
+            builder.append("           ").append(lines[i]).append("\n");
+        }
+        return builder.toString();
+    }
+
+    /**
+     * Pretty formats a collection {@link StreamsQuery}s.
+     * They will be sorted based on their Query IDs.
+     *
+     * @param queries - The queries to format. (not null)
+     * @return The pretty formatted string.
+     * @throws Exception A problem was encountered while pretty formatting the SPARQL.
+     */
+    public static String format(final Collection<StreamsQuery> queries) throws Exception {
+        requireNonNull(queries);
+
+        if(queries.size() == 1) {
+            return format(queries.iterator().next());
+        }
+
+        // Sort the queries based on their IDs.
+        final List<StreamsQuery> sorted = Lists.newArrayList(queries);
+        sorted.sort((query1, query2) -> {
+            final String id1 = query1.getQueryId().toString();
+            final String id2 = query2.getQueryId().toString();
+            return id1.compareTo(id2);
+        });
+
+        // Format a list of the queries.
+        final StringBuilder builder = new StringBuilder();
+        builder.append("-----------------------------------------------\n");
+        for(final StreamsQuery query : sorted) {
+            builder.append( format(query) );
+            builder.append("-----------------------------------------------\n");
+        }
+        return builder.toString();
+    }
+}
\ No newline at end of file
diff --git a/extras/shell/src/main/resources/META-INF/spring/spring-shell-plugin.xml b/extras/shell/src/main/resources/META-INF/spring/spring-shell-plugin.xml
index 361bf27..2473af1 100644
--- a/extras/shell/src/main/resources/META-INF/spring/spring-shell-plugin.xml
+++ b/extras/shell/src/main/resources/META-INF/spring/spring-shell-plugin.xml
@@ -31,7 +31,6 @@
     <!-- Define the shell state bean that will be shared across all of the commands. -->
     <bean id="sharedShellState" class="org.apache.rya.shell.SharedShellState" />
     <bean id="passwordPrompt" class="org.apache.rya.shell.util.PasswordPrompt.JLinePasswordPrompt" />
-<!--     <bean id="installPrompt" class="org.apache.rya.shell.util.InstallPrompt.JLineAccumuloInstallPrompt" /> -->
     <bean id="installPrompt" class="org.apache.rya.shell.util.InstallPrompt.JLineInstallPropmpt" />
     <bean id="uninstallPrompt" class="org.apache.rya.shell.util.UninstallPrompt.JLineUninstallPrompt" />
     <bean id="sparqlPrompt" class="org.apache.rya.shell.util.SparqlPrompt.JLineSparqlPrompt" />
@@ -41,11 +40,11 @@
     <bean id="ryaConnectionCommands" class="org.apache.rya.shell.RyaConnectionCommands" />
     <bean id="ryaAdminCommands" class="org.apache.rya.shell.RyaAdminCommands" />
     <bean id="ryaCommands" class="org.apache.rya.shell.RyaCommands" />
+    <bean id="ryaStreamsCommands" class="org.apache.rya.shell.RyaStreamsCommands" />
     
     <!--  
     <bean id="springHelpCommands" class="org.springframework.shell.commands.HelpCommands" />
     <bean id="springScriptCommands" class="org.springframework.shell.commands.ScriptCommands" />
     <bean id="springExitCommands" class="org.springframework.shell.commands.ExitCommands" />
     -->
-    
 </beans>
\ No newline at end of file
diff --git a/extras/shell/src/test/java/org/apache/rya/shell/RyaStreamsCommandsTest.java b/extras/shell/src/test/java/org/apache/rya/shell/RyaStreamsCommandsTest.java
new file mode 100644
index 0000000..9f5a794
--- /dev/null
+++ b/extras/shell/src/test/java/org/apache/rya/shell/RyaStreamsCommandsTest.java
@@ -0,0 +1,461 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.shell;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.rya.api.client.GetInstanceDetails;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.SetRyaStreamsConfiguration;
+import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
+import org.apache.rya.api.instance.RyaDetails;
+import org.apache.rya.api.instance.RyaDetails.RyaStreamsDetails;
+import org.apache.rya.shell.util.SparqlPrompt;
+import org.apache.rya.streams.api.RyaStreamsClient;
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.api.interactor.AddQuery;
+import org.apache.rya.streams.api.interactor.DeleteQuery;
+import org.apache.rya.streams.api.interactor.GetQuery;
+import org.apache.rya.streams.api.interactor.ListQueries;
+import org.apache.rya.streams.api.interactor.StartQuery;
+import org.apache.rya.streams.api.interactor.StopQuery;
+import org.junit.Test;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Sets;
+
+/**
+ * Unit tests the methods of {@link RyaStreamsCommands}.
+ */
+public class RyaStreamsCommandsTest {
+
+    @Test
+    public void configureRyaStreams() throws Exception {
+        // Mock the object that performs the configure operation.
+        final RyaClient mockCommands = mock(RyaClient.class);
+        final SetRyaStreamsConfiguration setStreams = mock(SetRyaStreamsConfiguration.class);
+        when(mockCommands.getSetRyaStreamsConfiguration()).thenReturn(setStreams);
+
+        // Mock a shell state and connect it to a Rya instance.
+        final SharedShellState state = new SharedShellState();
+        state.connectedToAccumulo(mock(AccumuloConnectionDetails.class), mockCommands);
+        state.connectedToInstance("unitTest");
+
+        // Verify that no Rya Streams Client is set to the state.
+        assertFalse(state.getShellState().getRyaStreamsCommands().isPresent());
+
+        try {
+            // Execute the command.
+            final RyaStreamsCommands commands = new RyaStreamsCommands(state, mock(SparqlPrompt.class));
+            final String message = commands.configureRyaStreams("localhost", 6);
+
+            // Verify the request was forwarded to the mocked interactor.
+            final RyaStreamsDetails expectedDetails = new RyaStreamsDetails("localhost", 6);
+            verify(setStreams).setRyaStreamsConfiguration(eq("unitTest"), eq(expectedDetails));
+
+            // Verify a RyaStreamsClient was created and added to the state.
+            assertTrue(state.getShellState().getRyaStreamsCommands().isPresent());
+
+            // Verify the correct message is reported.
+            final String expected = "The Rya Instance has been updated to use the provided Rya Streams subsystem. " +
+                    "Rya Streams commands are now avaiable while connected to this instance.";
+            assertEquals(expected, message);
+        } finally {
+            state.getShellState().getRyaStreamsCommands().get().close();
+        }
+    }
+
+    @Test
+    public void printRyaStreamsDetails_noRyaDetails() throws Exception {
+        // Mock the object that performs the configure operation.
+        final RyaClient mockCommands = mock(RyaClient.class);
+        final GetInstanceDetails getDetails = mock(GetInstanceDetails.class);
+        when(mockCommands.getGetInstanceDetails()).thenReturn(getDetails);
+
+        // When getting the instance details, ensure they are not found.
+        when(getDetails.getDetails(eq("unitTest"))).thenReturn(Optional.absent());
+
+        // Mock a shell state and connect it to a Rya instance.
+        final SharedShellState state = new SharedShellState();
+        state.connectedToAccumulo(mock(AccumuloConnectionDetails.class), mockCommands);
+        state.connectedToInstance("unitTest");
+
+        // Execute the command.
+        final RyaStreamsCommands commands = new RyaStreamsCommands(state, mock(SparqlPrompt.class));
+        final String message = commands.printRyaStreamsDetails();
+        final String expected = "This instance does not have any Rya Details, so it is unable to be connected to the Rya Streams subsystem.";
+        assertEquals(expected, message);
+    }
+
+    @Test
+    public void printRyaStreamsDetails_notConfigured() throws Exception {
+        // Mock the object that performs the configure operation.
+        final RyaClient mockCommands = mock(RyaClient.class);
+        final GetInstanceDetails getDetails = mock(GetInstanceDetails.class);
+        when(mockCommands.getGetInstanceDetails()).thenReturn(getDetails);
+
+        // When getting the instance details, ensure they do not have RyaStreamsDetails to print.
+        final RyaDetails details = mock(RyaDetails.class);
+        when(details.getRyaStreamsDetails()).thenReturn(Optional.absent());
+        when(getDetails.getDetails(eq("unitTest"))).thenReturn(Optional.of(details));
+
+        // Mock a shell state and connect it to a Rya instance.
+        final SharedShellState state = new SharedShellState();
+        state.connectedToAccumulo(mock(AccumuloConnectionDetails.class), mockCommands);
+        state.connectedToInstance("unitTest");
+
+        // Execute the command.
+        final RyaStreamsCommands commands = new RyaStreamsCommands(state, mock(SparqlPrompt.class));
+        final String message = commands.printRyaStreamsDetails();
+        final String expected = "This instance of Rya has not been configured to use a Rya Streams subsystem.";
+        assertEquals(expected, message);
+    }
+
+    @Test
+    public void printRyaStreamsDetails_configured() throws Exception {
+        // Mock the object that performs the configure operation.
+        final RyaClient mockCommands = mock(RyaClient.class);
+        final GetInstanceDetails getDetails = mock(GetInstanceDetails.class);
+        when(mockCommands.getGetInstanceDetails()).thenReturn(getDetails);
+
+        // When getting the instance details, ensure they do have RyaStreamsDetails to print.
+        final RyaDetails details = mock(RyaDetails.class);
+        when(details.getRyaStreamsDetails()).thenReturn(Optional.of(new RyaStreamsDetails("localhost", 6)));
+        when(getDetails.getDetails(eq("unitTest"))).thenReturn(Optional.of(details));
+
+        // Mock a shell state and connect it to a Rya instance.
+        final SharedShellState state = new SharedShellState();
+        state.connectedToAccumulo(mock(AccumuloConnectionDetails.class), mockCommands);
+        state.connectedToInstance("unitTest");
+
+        // Execute the command.
+        final RyaStreamsCommands commands = new RyaStreamsCommands(state, mock(SparqlPrompt.class));
+        final String message = commands.printRyaStreamsDetails();
+        final String expected = "Kafka Hostname: localhost, Kafka Port: 6";
+        assertEquals(expected, message);
+    }
+
+    @Test
+    public void addQuery_userAbortsSparqlPrompt() throws Exception {
+        // Mock the object that performs the rya streams operation.
+        final RyaStreamsClient mockClient = mock(RyaStreamsClient.class);
+        final AddQuery addQuery = mock(AddQuery.class);
+        when(mockClient.getAddQuery()).thenReturn(addQuery);
+
+        // Mock a SPARQL prompt that a user aborts.
+        final SparqlPrompt prompt = mock(SparqlPrompt.class);
+        when(prompt.getSparql()).thenReturn(Optional.absent());
+
+        // Mock a shell state and connect it to a Rya instance.
+        final SharedShellState state = new SharedShellState();
+        state.connectedToAccumulo(mock(AccumuloConnectionDetails.class), mock(RyaClient.class));
+        state.connectedToInstance("unitTest");
+        state.connectedToRyaStreams(mockClient);
+
+        // Execute the command.
+        final RyaStreamsCommands commands = new RyaStreamsCommands(state, prompt);
+        final String message = commands.addQuery(false);
+
+        // Verify a message is printed to the user.
+        assertEquals("", message);
+    }
+
+    @Test
+    public void addQuery() throws Exception {
+        // Mock the object that performs the rya streams operation.
+        final RyaStreamsClient mockClient = mock(RyaStreamsClient.class);
+        final AddQuery addQuery = mock(AddQuery.class);
+        when(mockClient.getAddQuery()).thenReturn(addQuery);
+
+        final StreamsQuery addedQuery = new StreamsQuery(UUID.randomUUID(), "query", true);
+        when(addQuery.addQuery(eq("query"), eq(true))).thenReturn(addedQuery);
+
+        // Mock a SPARQL prompt that a user entered a query through.
+        final SparqlPrompt prompt = mock(SparqlPrompt.class);
+        when(prompt.getSparql()).thenReturn(Optional.of("query"));
+
+        // Mock a shell state and connect it to a Rya instance.
+        final SharedShellState state = new SharedShellState();
+        state.connectedToAccumulo(mock(AccumuloConnectionDetails.class), mock(RyaClient.class));
+        state.connectedToInstance("unitTest");
+        state.connectedToRyaStreams(mockClient);
+
+        // Execute the command.
+        final RyaStreamsCommands commands = new RyaStreamsCommands(state, prompt);
+        final String message = commands.addQuery(false);
+
+        // Verify the interactor was invoked with the provided input.
+        verify(addQuery).addQuery("query", true);
+
+        // Verify a message is printed to the user.
+        final String expected = "The added query's ID is " + addedQuery.getQueryId();
+        assertEquals(expected, message);
+    }
+
+    @Test
+    public void deleteQuery() throws Exception {
+        // Mock the object that performs the rya streams operation.
+        final RyaStreamsClient mockClient = mock(RyaStreamsClient.class);
+        final DeleteQuery deleteQuery = mock(DeleteQuery.class);
+        when(mockClient.getDeleteQuery()).thenReturn(deleteQuery);
+
+        // Mock a shell state and connect it to a Rya instance.
+        final SharedShellState state = new SharedShellState();
+        state.connectedToAccumulo(mock(AccumuloConnectionDetails.class), mock(RyaClient.class));
+        state.connectedToInstance("unitTest");
+        state.connectedToRyaStreams(mockClient);
+
+        // Execute the command.
+        final RyaStreamsCommands commands = new RyaStreamsCommands(state, mock(SparqlPrompt.class));
+        final UUID queryId = UUID.randomUUID();
+        final String message = commands.deleteQuery(queryId.toString());
+
+        // Verify the interactor was invoked with the provided parameters.
+        verify(deleteQuery).delete(eq(queryId));
+
+        // Verify a message is printed to the user.
+        final String expected = "The query has been deleted.";
+        assertEquals(expected, message);
+    }
+
+    @Test
+    public void startQuery() throws Exception {
+        // Mock the object that performs the rya streams operation.
+        final RyaStreamsClient mockClient = mock(RyaStreamsClient.class);
+        final StartQuery startQuery = mock(StartQuery.class);
+        when(mockClient.getStartQuery()).thenReturn(startQuery);
+        final GetQuery getQuery = mock(GetQuery.class);
+        when(mockClient.getGetQuery()).thenReturn(getQuery);
+
+        // Mock a shell state and connect it to a Rya instance.
+        final SharedShellState state = new SharedShellState();
+        state.connectedToAccumulo(mock(AccumuloConnectionDetails.class), mock(RyaClient.class));
+        state.connectedToInstance("unitTest");
+        state.connectedToRyaStreams(mockClient);
+
+        // Report the query as not running.
+        final UUID queryId = UUID.randomUUID();
+        when(getQuery.getQuery(eq(queryId))).thenReturn(java.util.Optional.of(new StreamsQuery(queryId, "sparql", false)));
+
+        // Execute the command.
+        final RyaStreamsCommands commands = new RyaStreamsCommands(state, mock(SparqlPrompt.class));
+        final String message = commands.startQuery(queryId.toString());
+
+        // Verify the interactor was invoked with the provided parameters.
+        verify(startQuery).start(queryId);
+
+        // Verify a message is printed to the user.
+        final String expected = "The query will be processed by the Rya Streams subsystem.";
+        assertEquals(expected, message);
+    }
+
+    @Test
+    public void startQuery_alreadyRunning() throws Exception{
+        // Mock the object that performs the rya streams operation.
+        final RyaStreamsClient mockClient = mock(RyaStreamsClient.class);
+        final StartQuery startQuery = mock(StartQuery.class);
+        when(mockClient.getStartQuery()).thenReturn(startQuery);
+        final GetQuery getQuery = mock(GetQuery.class);
+        when(mockClient.getGetQuery()).thenReturn(getQuery);
+
+        // Mock a shell state and connect it to a Rya instance.
+        final SharedShellState state = new SharedShellState();
+        state.connectedToAccumulo(mock(AccumuloConnectionDetails.class), mock(RyaClient.class));
+        state.connectedToInstance("unitTest");
+        state.connectedToRyaStreams(mockClient);
+
+        // Report the query as running.
+        final UUID queryId = UUID.randomUUID();
+        when(getQuery.getQuery(eq(queryId))).thenReturn(java.util.Optional.of(new StreamsQuery(queryId, "sparql", true)));
+
+        // Execute the command.
+        final RyaStreamsCommands commands = new RyaStreamsCommands(state, mock(SparqlPrompt.class));
+        final String message = commands.startQuery(queryId.toString());
+
+        // Verify the interactor was not invoked.
+        verify(startQuery, never()).start(queryId);
+
+        // Verify a message is printed to the user.
+        final String expected = "That query is already running.";
+        assertEquals(expected, message);
+    }
+
+    @Test
+    public void stopQuery() throws Exception {
+        // Mock the object that performs the rya streams operation.
+        final RyaStreamsClient mockClient = mock(RyaStreamsClient.class);
+        final StopQuery stopQuery = mock(StopQuery.class);
+        when(mockClient.getStopQuery()).thenReturn(stopQuery);
+        final GetQuery getQuery = mock(GetQuery.class);
+        when(mockClient.getGetQuery()).thenReturn(getQuery);
+
+        // Mock a shell state and connect it to a Rya instance.
+        final SharedShellState state = new SharedShellState();
+        state.connectedToAccumulo(mock(AccumuloConnectionDetails.class), mock(RyaClient.class));
+        state.connectedToInstance("unitTest");
+        state.connectedToRyaStreams(mockClient);
+
+        // Report the query as running.
+        final UUID queryId = UUID.randomUUID();
+        when(getQuery.getQuery(eq(queryId))).thenReturn(java.util.Optional.of(new StreamsQuery(queryId, "sparql", true)));
+
+        // Execute the command.
+        final RyaStreamsCommands commands = new RyaStreamsCommands(state, mock(SparqlPrompt.class));
+        final String message = commands.stopQuery(queryId.toString());
+
+        // Verify the interactor was invoked with the provided parameters.
+        verify(stopQuery).stop(queryId);
+
+        // Verify a message is printed to the user.
+        final String expected = "The query will no longer be processed by the Rya Streams subsystem.";
+        assertEquals(expected, message);
+    }
+
+    @Test
+    public void stopQuery_alreadyStopped() throws Exception {
+        // Mock the object that performs the rya streams operation.
+        final RyaStreamsClient mockClient = mock(RyaStreamsClient.class);
+        final StopQuery stopQuery = mock(StopQuery.class);
+        when(mockClient.getStopQuery()).thenReturn(stopQuery);
+        final GetQuery getQuery = mock(GetQuery.class);
+        when(mockClient.getGetQuery()).thenReturn(getQuery);
+
+        // Mock a shell state and connect it to a Rya instance.
+        final SharedShellState state = new SharedShellState();
+        state.connectedToAccumulo(mock(AccumuloConnectionDetails.class), mock(RyaClient.class));
+        state.connectedToInstance("unitTest");
+        state.connectedToRyaStreams(mockClient);
+
+        // Report the query as not running.
+        final UUID queryId = UUID.randomUUID();
+        when(getQuery.getQuery(eq(queryId))).thenReturn(java.util.Optional.of(new StreamsQuery(queryId, "sparql", false)));
+
+        // Execute the command.
+        final RyaStreamsCommands commands = new RyaStreamsCommands(state, mock(SparqlPrompt.class));
+        final String message = commands.stopQuery(queryId.toString());
+
+        // Verify the interactor was not invoked with the provided parameters.
+        verify(stopQuery, never()).stop(queryId);
+
+        // Verify a message is printed to the user.
+        final String expected = "That query is already stopped.";
+        assertEquals(expected, message);
+    }
+
+    @Test
+    public void listQueries() throws Exception {
+        // Mock the object that performs the rya streams operation.
+        final RyaStreamsClient mockClient = mock(RyaStreamsClient.class);
+        final ListQueries listQueries = mock(ListQueries.class);
+        when(mockClient.getListQueries()).thenReturn(listQueries);
+
+        final Set<StreamsQuery> queries = Sets.newHashSet(
+                new StreamsQuery(
+                        UUID.fromString("33333333-3333-3333-3333-333333333333"),
+                        "SELECT * WHERE { ?person <urn:worksAt> ?business . }",
+                        true),
+                new StreamsQuery(
+                        UUID.fromString("11111111-1111-1111-1111-111111111111"),
+                        "SELECT * WHERE { ?a ?b ?c . }",
+                        true),
+                new StreamsQuery(
+                        UUID.fromString("22222222-2222-2222-2222-222222222222"),
+                        "SELECT * WHERE { ?d ?e ?f . }",
+                        false));
+        when(listQueries.all()).thenReturn(queries);
+
+        // Mock a shell state and connect it to a Rya instance.
+        final SharedShellState state = new SharedShellState();
+        state.connectedToAccumulo(mock(AccumuloConnectionDetails.class), mock(RyaClient.class));
+        state.connectedToInstance("unitTest");
+        state.connectedToRyaStreams(mockClient);
+
+        // Execute the command.
+        final RyaStreamsCommands commands = new RyaStreamsCommands(state, mock(SparqlPrompt.class));
+        final String message = commands.listQueries();
+
+        // Verify the correct report is returned.
+        final String expected =
+                "-----------------------------------------------\n" +
+                " Query ID: 11111111-1111-1111-1111-111111111111\n" +
+                "Is Active: true\n" +
+                "   SPARQL: select ?a ?b ?c\n" +
+                "           where {\n" +
+                "             ?a ?b ?c.\n" +
+                "           }\n" +
+                "-----------------------------------------------\n" +
+                " Query ID: 22222222-2222-2222-2222-222222222222\n" +
+                "Is Active: false\n" +
+                "   SPARQL: select ?d ?e ?f\n" +
+                "           where {\n" +
+                "             ?d ?e ?f.\n" +
+                "           }\n" +
+                "-----------------------------------------------\n" +
+                " Query ID: 33333333-3333-3333-3333-333333333333\n" +
+                "Is Active: true\n" +
+                "   SPARQL: select ?person ?business\n" +
+                "           where {\n" +
+                "             ?person <urn:worksAt> ?business.\n" +
+                "           }\n" +
+                "-----------------------------------------------\n";
+        assertEquals(expected, message);
+    }
+
+    @Test
+    public void printQueryDetails() throws Exception {
+        // Mock the object that performs the rya streams operation.
+        final RyaStreamsClient mockClient = mock(RyaStreamsClient.class);
+        final GetQuery getQuery = mock(GetQuery.class);
+        when(mockClient.getGetQuery()).thenReturn(getQuery);
+
+        final UUID queryId = UUID.fromString("da55cea5-c21c-46a5-ab79-5433eef4efaa");
+        final StreamsQuery query = new StreamsQuery(queryId, "SELECT * WHERE { ?a ?b ?c . }", true);
+        when(getQuery.getQuery(queryId)).thenReturn(java.util.Optional.of(query));
+
+        // Mock a shell state and connect it to a Rya instance.
+        final SharedShellState state = new SharedShellState();
+        state.connectedToAccumulo(mock(AccumuloConnectionDetails.class), mock(RyaClient.class));
+        state.connectedToInstance("unitTest");
+        state.connectedToRyaStreams(mockClient);
+
+        // Execute the command.
+        final RyaStreamsCommands commands = new RyaStreamsCommands(state, mock(SparqlPrompt.class));
+        final String message = commands.printQueryDetails(queryId.toString());
+
+        // Verify the correct report is returned.
+        final String expected =
+                " Query ID: da55cea5-c21c-46a5-ab79-5433eef4efaa\n" +
+                "Is Active: true\n" +
+                "   SPARQL: select ?a ?b ?c\n" +
+                "           where {\n" +
+                "             ?a ?b ?c.\n" +
+                "           }\n";
+        assertEquals(expected, message);
+    }
+}
\ No newline at end of file
diff --git a/extras/shell/src/test/java/org/apache/rya/shell/SharedShellStateTest.java b/extras/shell/src/test/java/org/apache/rya/shell/SharedShellStateTest.java
index b5f136c..e1ad039 100644
--- a/extras/shell/src/test/java/org/apache/rya/shell/SharedShellStateTest.java
+++ b/extras/shell/src/test/java/org/apache/rya/shell/SharedShellStateTest.java
@@ -25,6 +25,7 @@
 import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
 import org.apache.rya.shell.SharedShellState.ConnectionState;
 import org.apache.rya.shell.SharedShellState.ShellState;
+import org.apache.rya.streams.api.RyaStreamsClient;
 import org.junit.Test;
 
 /**
@@ -163,4 +164,34 @@
                 .build();
         assertEquals(expected, state.getShellState());
     }
+
+    @Test(expected = IllegalStateException.class)
+    public void connectedToRyaStreams_notConnectedToInstance() throws Exception {
+        // Create a shell state that is not connected to an instance.
+        final SharedShellState state = new SharedShellState();
+
+        // Connecting to a Rya Streams system fails.
+        state.connectedToRyaStreams( mock(RyaStreamsClient.class) );
+    }
+
+    @Test
+    public void connectedToRyaStreams() {
+        // Create a shell state.
+        final SharedShellState state = new SharedShellState();
+
+        // Connect to Accumulo.
+        final AccumuloConnectionDetails connectionDetails = mock(AccumuloConnectionDetails.class);
+        final RyaClient connectedCommands = mock(RyaClient.class);
+        state.connectedToAccumulo(connectionDetails, connectedCommands);
+
+        // Connect to an Instance.
+        state.connectedToInstance("instance");
+
+        // Connect to Rya Streams for the instance.
+        final RyaStreamsClient streamsClient = mock(RyaStreamsClient.class);
+        state.connectedToRyaStreams(streamsClient);
+
+        // Verify the state.
+        assertEquals(streamsClient, state.getShellState().getRyaStreamsCommands().get());
+    }
 }
\ No newline at end of file
diff --git a/extras/shell/src/test/java/org/apache/rya/shell/util/StreamsQueryFormatterTest.java b/extras/shell/src/test/java/org/apache/rya/shell/util/StreamsQueryFormatterTest.java
new file mode 100644
index 0000000..8e5d251
--- /dev/null
+++ b/extras/shell/src/test/java/org/apache/rya/shell/util/StreamsQueryFormatterTest.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.shell.util;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.junit.Test;
+
+import com.google.common.collect.Sets;
+
+/**
+ * Unit tests the methods of {@link StreamsQueryFormatter}.
+ */
+public class StreamsQueryFormatterTest {
+
+    @Test
+    public void formatQuery() throws Exception {
+        // Format the query.
+        final StreamsQuery query = new StreamsQuery(
+                UUID.fromString("da55cea5-c21c-46a5-ab79-5433eef4efaa"),
+                "SELECT * WHERE { ?a ?b ?c . }",
+                true);
+        final String formatted = StreamsQueryFormatter.format(query);
+
+        // Ensure it has the expected format.
+        final String expected =
+                " Query ID: da55cea5-c21c-46a5-ab79-5433eef4efaa\n" +
+                "Is Active: true\n" +
+                "   SPARQL: select ?a ?b ?c\n" +
+                "           where {\n" +
+                "             ?a ?b ?c.\n" +
+                "           }\n";
+
+        assertEquals(expected, formatted);
+    }
+
+    @Test
+    public void formatQueries() throws Exception {
+        // Format the queries.
+        final Set<StreamsQuery> queries = Sets.newHashSet(
+                new StreamsQuery(
+                        UUID.fromString("33333333-3333-3333-3333-333333333333"),
+                        "SELECT * WHERE { ?person <urn:worksAt> ?business . }",
+                        true),
+                new StreamsQuery(
+                        UUID.fromString("11111111-1111-1111-1111-111111111111"),
+                        "SELECT * WHERE { ?a ?b ?c . }",
+                        true),
+                new StreamsQuery(
+                        UUID.fromString("22222222-2222-2222-2222-222222222222"),
+                        "SELECT * WHERE { ?d ?e ?f . }",
+                        false));
+
+        final String formatted = StreamsQueryFormatter.format(queries);
+
+        // Ensure it has the expected format.
+        final String expected =
+                "-----------------------------------------------\n" +
+                " Query ID: 11111111-1111-1111-1111-111111111111\n" +
+                "Is Active: true\n" +
+                "   SPARQL: select ?a ?b ?c\n" +
+                "           where {\n" +
+                "             ?a ?b ?c.\n" +
+                "           }\n" +
+                "-----------------------------------------------\n" +
+                " Query ID: 22222222-2222-2222-2222-222222222222\n" +
+                "Is Active: false\n" +
+                "   SPARQL: select ?d ?e ?f\n" +
+                "           where {\n" +
+                "             ?d ?e ?f.\n" +
+                "           }\n" +
+                "-----------------------------------------------\n" +
+                " Query ID: 33333333-3333-3333-3333-333333333333\n" +
+                "Is Active: true\n" +
+                "   SPARQL: select ?person ?business\n" +
+                "           where {\n" +
+                "             ?person <urn:worksAt> ?business.\n" +
+                "           }\n" +
+                "-----------------------------------------------\n";
+        assertEquals(expected, formatted);
+    }
+}
\ No newline at end of file
diff --git a/extras/shell/src/test/resources/RyaShellTest-context.xml b/extras/shell/src/test/resources/RyaShellTest-context.xml
index f7ffe0f..54c44cf 100644
--- a/extras/shell/src/test/resources/RyaShellTest-context.xml
+++ b/extras/shell/src/test/resources/RyaShellTest-context.xml
@@ -60,4 +60,5 @@
     <bean id="ryaConnectionCommands" class="org.apache.rya.shell.RyaConnectionCommands" />
     <bean id="ryaCommands" class="org.apache.rya.shell.RyaCommands" />
     <bean id="ryaAdminCommands" class="org.apache.rya.shell.RyaAdminCommands" />
+    <bean id="ryaStreamsCommands" class="org.apache.rya.shell.RyaStreamsCommands" />
 </beans>
\ No newline at end of file