diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java
index f4b7b25..dca040f 100644
--- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java
@@ -20,7 +20,9 @@
 
 import static java.util.Objects.requireNonNull;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -33,6 +35,8 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.util.concurrent.AbstractScheduledService;
+
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
 import edu.umd.cs.findbugs.annotations.NonNull;
 import info.aduna.iteration.CloseableIteration;
@@ -46,7 +50,7 @@
  * Thread safe.
  */
 @DefaultAnnotation(NonNull.class)
-public class InMemoryQueryRepository implements QueryRepository {
+public class InMemoryQueryRepository extends AbstractScheduledService implements QueryRepository {
     private static final Logger LOG = LoggerFactory.getLogger(InMemoryQueryRepository.class);
 
     private final ReentrantLock lock = new ReentrantLock();
@@ -67,20 +71,34 @@
     private final Map<UUID, StreamsQuery> queriesCache = new HashMap<>();
 
     /**
+     * The listeners to be notified when new QueryChangeLogs come in.
+     */
+    private final List<QueryChangeLogListener> listeners = new ArrayList<>();
+
+    /**
+     * The {@link Scheduler} the repository uses to periodically poll for query updates.
+     */
+    private final Scheduler scheduler;
+
+    /**
      * Constructs an instance of {@link InMemoryQueryRepository}.
      *
      * @param changeLog - The change log that this repository will maintain and be based on. (not null)
+     * @param scheduler - The {@link Scheduler} this service uses to periodically check for query updates. (not null)
      */
-    public InMemoryQueryRepository(final QueryChangeLog changeLog) {
+    public InMemoryQueryRepository(final QueryChangeLog changeLog, final Scheduler scheduler) {
         this.changeLog = requireNonNull(changeLog);
+        this.scheduler = requireNonNull(scheduler);
     }
 
     @Override
-    public StreamsQuery add(final String query, final boolean isActive) throws QueryRepositoryException {
+    public StreamsQuery add(final String query, final boolean isActive)
+            throws QueryRepositoryException, IllegalStateException {
         requireNonNull(query);
 
         lock.lock();
         try {
+            checkState();
             // First record the change to the log.
             final UUID queryId = UUID.randomUUID();
             final QueryChange change = QueryChange.create(queryId, query, isActive);
@@ -100,11 +118,12 @@
     }
 
     @Override
-    public Optional<StreamsQuery> get(final UUID queryId) throws QueryRepositoryException {
+    public Optional<StreamsQuery> get(final UUID queryId) throws QueryRepositoryException, IllegalStateException {
         requireNonNull(queryId);
 
         lock.lock();
         try {
+            checkState();
             // Update the cache to represent what is currently in the log.
             updateCache();
 
@@ -115,11 +134,13 @@
     }
 
     @Override
-    public void updateIsActive(final UUID queryId, final boolean isActive) throws QueryRepositoryException {
+    public void updateIsActive(final UUID queryId, final boolean isActive)
+            throws QueryRepositoryException, IllegalStateException {
         requireNonNull(queryId);
 
         lock.lock();
         try {
+            checkState();
             // Update the cache to represent what is currently in the log.
             updateCache();
 
@@ -140,11 +161,12 @@
     }
 
     @Override
-    public void delete(final UUID queryId) throws QueryRepositoryException {
+    public void delete(final UUID queryId) throws QueryRepositoryException, IllegalStateException {
         requireNonNull(queryId);
 
         lock.lock();
         try {
+            checkState();
             // First record the change to the log.
             final QueryChange change = QueryChange.delete(queryId);
             changeLog.write(change);
@@ -157,9 +179,10 @@
     }
 
     @Override
-    public Set<StreamsQuery> list() throws QueryRepositoryException {
+    public Set<StreamsQuery> list() throws QueryRepositoryException, IllegalStateException {
         lock.lock();
         try {
+            checkState();
             // Update the cache to represent what is currently in the log.
             updateCache();
 
@@ -174,7 +197,8 @@
     }
 
     @Override
-    public void close() throws Exception {
+    protected void shutDown() throws Exception {
+        super.shutDown();
         lock.lock();
         try {
             changeLog.close();
@@ -229,6 +253,8 @@
                         break;
                 }
 
+                listeners.forEach(listener -> listener.notify(entry));
+
                 cachePosition = Optional.of( entry.getPosition() );
             }
 
@@ -247,4 +273,52 @@
             }
         }
     }
+
+    @Override
+    protected void runOneIteration() throws Exception {
+        lock.lock();
+        try {
+            updateCache();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    @Override
+    protected Scheduler scheduler() {
+        return scheduler;
+    }
+
+    @Override
+    public Set<StreamsQuery> subscribe(final QueryChangeLogListener listener) {
+        //locks to prevent the current state from changing while subscribing.
+        lock.lock();
+        try {
+            listeners.add(listener);
+
+            //return the current state of the query repository
+            return queriesCache.values()
+                    .stream()
+                    .collect(Collectors.toSet());
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    @Override
+    public void unsubscribe(final QueryChangeLogListener listener) {
+        lock.lock();
+        try {
+            listeners.remove(listener);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    private void checkState() {
+        if (!super.isRunning() && !listeners.isEmpty()) {
+            throw new IllegalStateException(
+                    "The Query Repository is subscribed to, but the service has not been started.");
+        }
+    }
 }
\ No newline at end of file
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChangeLogListener.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChangeLogListener.java
new file mode 100644
index 0000000..2b61227
--- /dev/null
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChangeLogListener.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.api.queries;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Listener to be notified when {@link QueryChange}s occur on a {@link QueryChangeLog}.
+ */
+@DefaultAnnotation(NonNull.class)
+public interface QueryChangeLogListener {
+    /**
+     * Notifies the listener that a query change event has occurred in the change log.
+     * <p>
+     * <b>Note:</b>
+     * <p>
+     * The QueryRepository blocks when notifying this listener.  Long lasting operations
+     * should not be performed within this function.  Doing so will block all operations
+     * on the repository.
+     *
+     * @param queryChangeEvent - The event that occurred. (not null)
+     */
+    public void notify(final ChangeLogEntry<QueryChange> queryChangeEvent);
+}
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryRepository.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryRepository.java
index fd51b2f..4d8b2db 100644
--- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryRepository.java
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryRepository.java
@@ -25,14 +25,20 @@
 import org.apache.rya.streams.api.entity.StreamsQuery;
 import org.apache.rya.streams.api.exception.RyaStreamsException;
 
+import com.google.common.util.concurrent.Service;
+
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
 import edu.umd.cs.findbugs.annotations.NonNull;
 
 /**
  * Repository for adding, deleting, and listing active queries in Rya Streams.
+ *
+ * This service only needs to be started if it is being subscribed to. An
+ * {@link IllegalStateException} will be thrown if the service is subscribed to
+ * and used without being started.
  */
 @DefaultAnnotation(NonNull.class)
-public interface QueryRepository extends AutoCloseable {
+public interface QueryRepository extends Service {
 
     /**
      * Adds a new query to Rya Streams.
@@ -42,8 +48,9 @@
      *   otherwise {@code false}.
      * @return The {@link StreamsQuery} used in Rya Streams.
      * @throws QueryRepositoryException Could not add the query.
+     * @throws IllegalStateException The Service has not been started, but has been subscribed to.
      */
-    public StreamsQuery add(final String query, boolean isActive) throws QueryRepositoryException;
+    public StreamsQuery add(final String query, boolean isActive) throws QueryRepositoryException, IllegalStateException;
 
     /**
      * Updates the isActive state of a {@link StreamsQuery}. Setting this value to {@code true}
@@ -53,8 +60,9 @@
      * @param queryId - Identifies which query will be updated. (not null)
      * @param isActive - The new isActive state for the query.
      * @throws QueryRepositoryException If the query does not exist or something else caused the change to fail.
+     * @throws IllegalStateException The Service has not been started, but has been subscribed to.
      */
-    public void updateIsActive(UUID queryId, boolean isActive) throws QueryRepositoryException;
+    public void updateIsActive(UUID queryId, boolean isActive) throws QueryRepositoryException, IllegalStateException;
 
     /**
      * Get an existing query from Rya Streams.
@@ -62,24 +70,42 @@
      * @param queryId - Identifies which query will be fetched.
      * @return the {@link StreamsQuery} for the id if one exists; otherwise empty.
      * @throws QueryRepositoryException The query could not be fetched.
+     * @throws IllegalStateException The Service has not been started, but has been subscribed to.
      */
-    public Optional<StreamsQuery> get(UUID queryId) throws QueryRepositoryException;
+    public Optional<StreamsQuery> get(UUID queryId) throws QueryRepositoryException, IllegalStateException;
 
     /**
      * Removes an existing query from Rya Streams.
      *
      * @param queryID - The {@link UUID} of the query to remove. (not null)
      * @throws QueryRepositoryException Could not delete the query.
+     * @throws IllegalStateException The Service has not been started, but has been subscribed to.
      */
-    public void delete(UUID queryID) throws QueryRepositoryException;
+    public void delete(UUID queryID) throws QueryRepositoryException, IllegalStateException;
 
     /**
      * Lists all existing queries in Rya Streams.
      *
      * @return - A List of the current {@link StreamsQuery}s
      * @throws QueryRepositoryException The {@link StreamsQuery}s could not be listed.
+     * @throws IllegalStateException The Service has not been started, but has been subscribed to.
      */
-    public Set<StreamsQuery> list() throws QueryRepositoryException;
+    public Set<StreamsQuery> list() throws QueryRepositoryException, IllegalStateException;
+
+    /**
+     * Subscribes a {@link QueryChangeLogListener} to the {@link QueryRepository}.
+     *
+     * @param listener - The {@link QueryChangeLogListener} to subscribe to this {@link QueryRepository}. (not null)
+     * @return The current state of the repository in the form of {@link StreamsQuery}s.
+     */
+    public Set<StreamsQuery> subscribe(final QueryChangeLogListener listener);
+
+    /**
+     * Unsubscribe a {@link QueryChangeLogListener} from the {@link QueryRepository}.
+     *
+     * @param listener - The {@link QueryChangeLogListener} to unsubscribe from this {@link QueryRepository}. (not null)
+     */
+    public void unsubscribe(final QueryChangeLogListener listener);
 
     /**
      * A function of {@link QueryRepository} was unable to perform a function.
diff --git a/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java b/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java
index 22e616d..76c3216 100644
--- a/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java
+++ b/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java
@@ -20,6 +20,7 @@
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -27,56 +28,62 @@
 import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.rya.streams.api.entity.StreamsQuery;
 import org.apache.rya.streams.api.queries.QueryChangeLog.QueryChangeLogException;
 import org.junit.Test;
 
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
+
 /**
  * Unit tests the methods of {@link InMemoryQueryRepository}.
  */
 public class InMemoryQueryRepositoryTest {
+    private static final Scheduler SCHEDULE = Scheduler.newFixedRateSchedule(0L, 100, TimeUnit.MILLISECONDS);
 
     @Test
     public void canReadAddedQueries() throws Exception {
         // Setup a totally in memory QueryRepository.
-        try(final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog() )) {
-            // Add some queries to it.
-            final Set<StreamsQuery> expected = new HashSet<>();
-            expected.add( queries.add("query 1", true) );
-            expected.add( queries.add("query 2", false) );
-            expected.add( queries.add("query 3", true) );
+        final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog(), SCHEDULE );
+        // Add some queries to it.
+        final Set<StreamsQuery> expected = new HashSet<>();
+        expected.add( queries.add("query 1", true) );
+        expected.add( queries.add("query 2", false) );
+        expected.add( queries.add("query 3", true) );
 
-            // Show they are in the list of all queries.
-            final Set<StreamsQuery> stored = queries.list();
-            assertEquals(expected, stored);
-        }
+        // Show they are in the list of all queries.
+        final Set<StreamsQuery> stored = queries.list();
+        assertEquals(expected, stored);
     }
 
     @Test
     public void deletedQueriesDisappear() throws Exception {
         // Setup a totally in memory QueryRepository.
-        try(final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog() )) {
-            // Add some queries to it. The second one we will delete.
-            final Set<StreamsQuery> expected = new HashSet<>();
-            expected.add( queries.add("query 1", true) );
-            final UUID deletedMeId = queries.add("query 2", false).getQueryId();
-            expected.add( queries.add("query 3", true) );
+        final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog(), SCHEDULE );
+        // Add some queries to it. The second one we will delete.
+        final Set<StreamsQuery> expected = new HashSet<>();
+        expected.add( queries.add("query 1", true) );
+        final UUID deletedMeId = queries.add("query 2", false).getQueryId();
+        expected.add( queries.add("query 3", true) );
 
-            // Delete the second query.
-            queries.delete( deletedMeId );
+        // Delete the second query.
+        queries.delete( deletedMeId );
 
-            // Show only queries 1 and 3 are in the list.
-            final Set<StreamsQuery> stored = queries.list();
-            assertEquals(expected, stored);
-        }
+        // Show only queries 1 and 3 are in the list.
+        final Set<StreamsQuery> stored = queries.list();
+        assertEquals(expected, stored);
     }
 
     @Test
     public void initializedWithPopulatedChangeLog() throws Exception {
         // Setup a totally in memory QueryRepository. Hold onto the change log so that we can use it again later.
         final QueryChangeLog changeLog = new InMemoryQueryChangeLog();
-        try(final QueryRepository queries = new InMemoryQueryRepository( changeLog )) {
+        final QueryRepository queries = new InMemoryQueryRepository( changeLog, SCHEDULE );
+        try {
+            queries.startAndWait();
             // Add some queries and deletes to it.
             final Set<StreamsQuery> expected = new HashSet<>();
             expected.add( queries.add("query 1", true) );
@@ -85,11 +92,16 @@
             queries.delete( deletedMeId );
 
             // Create a new totally in memory QueryRepository.
-            try(final QueryRepository initializedQueries = new InMemoryQueryRepository( changeLog )) {
+            final QueryRepository initializedQueries = new InMemoryQueryRepository( changeLog, SCHEDULE );
+            try {
                 // Listing the queries should work using an initialized change log.
                 final Set<StreamsQuery> stored = initializedQueries.list();
                 assertEquals(expected, stored);
+            } finally {
+                queries.stop();
             }
+        } finally {
+            queries.stop();
         }
     }
 
@@ -100,50 +112,132 @@
         when(changeLog.readFromStart()).thenThrow(new QueryChangeLogException("Mocked exception."));
 
         // Create the QueryRepository and invoke one of the methods.
-        try(final QueryRepository queries = new InMemoryQueryRepository( changeLog )) {
-            queries.list();
-        }
+        final QueryRepository queries = new InMemoryQueryRepository( changeLog, SCHEDULE );
+        queries.list();
     }
 
     @Test
     public void get_present() throws Exception {
         // Setup a totally in memory QueryRepository.
-        try(final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog() )) {
-            // Add a query to it.
-            final StreamsQuery query = queries.add("query 1", true);
+        final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog(), SCHEDULE );
+        // Add a query to it.
+        final StreamsQuery query = queries.add("query 1", true);
 
-            // Show the fetched query matches the expected ones.
-            final Optional<StreamsQuery> fetched = queries.get(query.getQueryId());
-            assertEquals(query, fetched.get());
-        }
+        // Show the fetched query matches the expected ones.
+        final Optional<StreamsQuery> fetched = queries.get(query.getQueryId());
+        assertEquals(query, fetched.get());
     }
 
     @Test
     public void get_notPresent() throws Exception {
         // Setup a totally in memory QueryRepository.
-        try(final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog() )) {
-            // Fetch a query that was never added to the repository.
-            final Optional<StreamsQuery> query = queries.get(UUID.randomUUID());
+        final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog(), SCHEDULE );
+        // Fetch a query that was never added to the repository.
+        final Optional<StreamsQuery> query = queries.get(UUID.randomUUID());
 
-            // Show it could not be found.
-            assertFalse(query.isPresent());
-        }
+        // Show it could not be found.
+        assertFalse(query.isPresent());
     }
 
     @Test
     public void update() throws Exception {
         // Setup a totally in memory QueryRepository.
-        try(final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog() )) {
+        final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog(), SCHEDULE );
+        // Add a query to it.
+        final StreamsQuery query = queries.add("query 1", true);
+
+        // Change the isActive state of that query.
+        queries.updateIsActive(query.getQueryId(), false);
+
+        // Show the fetched query matches the expected one.
+        final Optional<StreamsQuery> fetched = queries.get(query.getQueryId());
+        final StreamsQuery expected = new StreamsQuery(query.getQueryId(), query.getSparql(), false);
+        assertEquals(expected, fetched.get());
+    }
+
+    @Test
+    public void updateListenerNotify() throws Exception {
+        // Setup a totally in memory QueryRepository.
+        final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog(), SCHEDULE );
+        try {
+            queries.startAndWait();
+
             // Add a query to it.
             final StreamsQuery query = queries.add("query 1", true);
 
-            // Change the isActive state of that query.
-            queries.updateIsActive(query.getQueryId(), false);
+            final Set<StreamsQuery> existing = queries.subscribe(new QueryChangeLogListener() {
+                @Override
+                public void notify(final ChangeLogEntry<QueryChange> queryChangeEvent) {
+                    final ChangeLogEntry<QueryChange> expected = new ChangeLogEntry<QueryChange>(1L,
+                            QueryChange.create(queryChangeEvent.getEntry().getQueryId(), "query 2", true));
+                    assertEquals(expected, queryChangeEvent);
+                }
+            });
 
-            // Show the fetched query matches the expected one.
-            final Optional<StreamsQuery> fetched = queries.get(query.getQueryId());
-            final StreamsQuery expected = new StreamsQuery(query.getQueryId(), query.getSparql(), false);
-            assertEquals(expected, fetched.get());
+            assertEquals(Sets.newHashSet(query), existing);
+
+            queries.add("query 2", true);
+        } finally {
+            queries.stop();
         }
     }
+
+    @Test
+    public void updateListenerNotify_multiClient() throws Exception {
+        // Setup a totally in memory QueryRepository.
+        final QueryChangeLog changeLog = new InMemoryQueryChangeLog();
+        final QueryRepository queries = new InMemoryQueryRepository( changeLog, SCHEDULE );
+        final QueryRepository queries2 = new InMemoryQueryRepository( changeLog, SCHEDULE );
+
+        try {
+            queries.startAndWait();
+            queries2.startAndWait();
+
+            //show listener on repo that query was added to is being notified of the new query.
+            final CountDownLatch repo1Latch = new CountDownLatch(1);
+            queries.subscribe(new QueryChangeLogListener() {
+                @Override
+                public void notify(final ChangeLogEntry<QueryChange> queryChangeEvent) {
+                    final ChangeLogEntry<QueryChange> expected = new ChangeLogEntry<QueryChange>(0L,
+                            QueryChange.create(queryChangeEvent.getEntry().getQueryId(), "query 2", true));
+                    assertEquals(expected, queryChangeEvent);
+                    repo1Latch.countDown();
+                }
+            });
+
+            //show listener not on the repo that query was added to is being notified as well.
+            final CountDownLatch repo2Latch = new CountDownLatch(1);
+            queries2.subscribe(new QueryChangeLogListener() {
+                @Override
+                public void notify(final ChangeLogEntry<QueryChange> queryChangeEvent) {
+                    final ChangeLogEntry<QueryChange> expected = new ChangeLogEntry<QueryChange>(0L,
+                            QueryChange.create(queryChangeEvent.getEntry().getQueryId(), "query 2", true));
+                    assertEquals(expected, queryChangeEvent);
+                    repo2Latch.countDown();
+                }
+            });
+
+            queries.add("query 2", true);
+
+            assertTrue(repo1Latch.await(5, TimeUnit.SECONDS));
+            assertTrue(repo2Latch.await(5, TimeUnit.SECONDS));
+        } catch(final InterruptedException e ) {
+            System.out.println("PING");
+        } finally {
+            queries.stop();
+            queries2.stop();
+        }
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void subscribe_notStarted() throws Exception {
+        // Setup a totally in memory QueryRepository.
+        final QueryRepository queries = new InMemoryQueryRepository(new InMemoryQueryChangeLog(), SCHEDULE);
+        queries.subscribe(new QueryChangeLogListener() {
+            @Override
+            public void notify(final ChangeLogEntry<QueryChange> queryChangeEvent) {}
+        });
+
+        queries.add("query 2", true);
+    }
 }
\ No newline at end of file
diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/AddQueryCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/AddQueryCommand.java
index 275a975..9273c33 100644
--- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/AddQueryCommand.java
+++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/AddQueryCommand.java
@@ -20,6 +20,8 @@
 
 import static java.util.Objects.requireNonNull;
 
+import java.util.concurrent.TimeUnit;
+
 import org.apache.rya.streams.api.entity.StreamsQuery;
 import org.apache.rya.streams.api.exception.RyaStreamsException;
 import org.apache.rya.streams.api.interactor.AddQuery;
@@ -35,6 +37,7 @@
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.ParameterException;
 import com.google.common.base.Strings;
+import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
 
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
 import edu.umd.cs.findbugs.annotations.NonNull;
@@ -115,8 +118,11 @@
         final String topic = KafkaTopics.queryChangeLogTopic(params.ryaInstance);
         final QueryChangeLog queryChangeLog = KafkaQueryChangeLogFactory.make(bootstrapServers, topic);
 
+        //The AddQuery command doesn't use the scheduled service feature.
+        final Scheduler scheduler = Scheduler.newFixedRateSchedule(0L, 5, TimeUnit.SECONDS);
+        final QueryRepository queryRepo = new InMemoryQueryRepository(queryChangeLog, scheduler);
         // Execute the add query command.
-        try(QueryRepository queryRepo = new InMemoryQueryRepository(queryChangeLog)) {
+        try {
             final AddQuery addQuery = new DefaultAddQuery(queryRepo);
             try {
                 final StreamsQuery query = addQuery.addQuery(params.query, Boolean.parseBoolean(params.isActive));
diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/DeleteQueryCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/DeleteQueryCommand.java
index 2aeb90c..0d96df0 100644
--- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/DeleteQueryCommand.java
+++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/DeleteQueryCommand.java
@@ -21,6 +21,7 @@
 import static java.util.Objects.requireNonNull;
 
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.rya.streams.api.exception.RyaStreamsException;
 import org.apache.rya.streams.api.interactor.DeleteQuery;
@@ -36,6 +37,7 @@
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.ParameterException;
 import com.google.common.base.Strings;
+import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
 
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
 import edu.umd.cs.findbugs.annotations.NonNull;
@@ -113,8 +115,11 @@
         final String topic = KafkaTopics.queryChangeLogTopic(params.ryaInstance);
         final QueryChangeLog queryChangeLog = KafkaQueryChangeLogFactory.make(bootstrapServers, topic);
 
+        //The DeleteQuery command doesn't use the scheduled service feature.
+        final Scheduler scheduler = Scheduler.newFixedRateSchedule(0L, 5, TimeUnit.SECONDS);
+        final QueryRepository queryRepo = new InMemoryQueryRepository(queryChangeLog, scheduler);
         // Execute the delete query command.
-        try(QueryRepository queryRepo = new InMemoryQueryRepository(queryChangeLog)) {
+        try {
             final DeleteQuery deleteQuery = new DefaultDeleteQuery(queryRepo);
             try {
                 deleteQuery.delete(UUID.fromString(params.queryId));
diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java
index 670007b..cd78975 100644
--- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java
+++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java
@@ -21,6 +21,7 @@
 import static java.util.Objects.requireNonNull;
 
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.rya.streams.api.entity.StreamsQuery;
 import org.apache.rya.streams.api.exception.RyaStreamsException;
@@ -35,6 +36,7 @@
 
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.ParameterException;
+import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
 
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
 import edu.umd.cs.findbugs.annotations.NonNull;
@@ -83,8 +85,11 @@
         final String topic = KafkaTopics.queryChangeLogTopic(params.ryaInstance);
         final QueryChangeLog queryChangeLog = KafkaQueryChangeLogFactory.make(bootstrapServers, topic);
 
+        //The ListQueries command doesn't use the scheduled service feature.
+        final Scheduler scheduler = Scheduler.newFixedRateSchedule(0L, 5, TimeUnit.SECONDS);
+        final QueryRepository queryRepo = new InMemoryQueryRepository(queryChangeLog, scheduler);
         // Execute the list queries command.
-        try(QueryRepository queryRepo = new InMemoryQueryRepository(queryChangeLog)) {
+        try {
             final ListQueries listQueries = new DefaultListQueries(queryRepo);
             try {
                 final Set<StreamsQuery> queries = listQueries.all();
diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java
index 8f7f162..ddaf647 100644
--- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java
+++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java
@@ -24,6 +24,7 @@
 import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.rya.streams.api.entity.StreamsQuery;
 import org.apache.rya.streams.api.queries.InMemoryQueryRepository;
@@ -39,6 +40,7 @@
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.ParameterException;
 import com.google.common.base.Strings;
+import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
 
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
 import edu.umd.cs.findbugs.annotations.NonNull;
@@ -117,8 +119,11 @@
         final String topic = KafkaTopics.queryChangeLogTopic(params.ryaInstance);
         final QueryChangeLog queryChangeLog = KafkaQueryChangeLogFactory.make(bootstrapServers, topic);
 
+        //The RunQuery command doesn't use the scheduled service feature.
+        final Scheduler scheduler = Scheduler.newFixedRateSchedule(0L, 5, TimeUnit.SECONDS);
+        final QueryRepository queryRepo = new InMemoryQueryRepository(queryChangeLog, scheduler);
         // Look up the query to be executed from the change log.
-        try(QueryRepository queryRepo = new InMemoryQueryRepository(queryChangeLog)) {
+        try {
             try {
                 final UUID queryId = UUID.fromString( params.queryId );
                 final Optional<StreamsQuery> query = queryRepo.get(queryId);
@@ -145,7 +150,7 @@
             } catch(final Exception e) {
                 throw new ExecutionException("Could not execute the Run Query command.", e);
             }
-        } catch(final ArgumentsException | ExecutionException e) {
+        } catch(final ExecutionException e) {
             // Rethrow the exceptions that are advertised by execute.
             throw e;
         } catch (final Exception e) {
diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java
index 7c548f1..3612dd0 100644
--- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java
+++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java
@@ -22,6 +22,7 @@
 
 import java.util.Optional;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.rya.streams.api.entity.QueryResultStream;
@@ -45,6 +46,7 @@
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.ParameterException;
 import com.google.common.base.Strings;
+import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
 
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
 import edu.umd.cs.findbugs.annotations.NonNull;
@@ -132,9 +134,12 @@
             throw new ArgumentsException("Invalid Query ID " + params.queryId);
         }
 
+        //The DeleteQuery command doesn't use the scheduled service feature.
+        final Scheduler scheduler = Scheduler.newFixedRateSchedule(0L, 5, TimeUnit.SECONDS);
+        final QueryRepository queryRepo = new InMemoryQueryRepository(queryChangeLog, scheduler);
         // Fetch the SPARQL of the query whose results will be streamed.
         final String sparql;
-        try(QueryRepository queryRepo = new InMemoryQueryRepository(queryChangeLog)) {
+        try {
             final Optional<StreamsQuery> sQuery = queryRepo.get(queryId);
             if(!sQuery.isPresent()) {
                 throw new ExecutionException("Could not read the results for query with ID " + queryId +
diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java
index 8b4f074..3bfbadc 100644
--- a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java
+++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java
@@ -22,6 +22,7 @@
 
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.producer.Producer;
@@ -38,11 +39,12 @@
 import org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer;
 import org.apache.rya.test.kafka.KafkaTestInstanceRule;
 import org.apache.rya.test.kafka.KafkaTestUtil;
-import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 
+import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
+
 /**
  * integration Test for adding a new query through a command.
  */
@@ -64,12 +66,7 @@
         final Producer<?, QueryChange> queryProducer = KafkaTestUtil.makeProducer(kafka, StringSerializer.class, QueryChangeSerializer.class);
         final Consumer<?, QueryChange>queryConsumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, QueryChangeDeserializer.class);
         final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, changeLogTopic);
-        queryRepo = new InMemoryQueryRepository(changeLog);
-    }
-
-    @After
-    public void cleanup() throws Exception {
-        queryRepo.close();
+        queryRepo = new InMemoryQueryRepository(changeLog, Scheduler.newFixedRateSchedule(0L, 5, TimeUnit.SECONDS));
     }
 
     @Test
diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java
index 6083543..7bec080 100644
--- a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java
+++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java
@@ -23,6 +23,7 @@
 
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.producer.Producer;
@@ -39,11 +40,12 @@
 import org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer;
 import org.apache.rya.test.kafka.KafkaTestInstanceRule;
 import org.apache.rya.test.kafka.KafkaTestUtil;
-import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 
+import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
+
 /**
  * Integration Test for deleting a query from Rya Streams through a command.
  */
@@ -66,12 +68,7 @@
         final Producer<?, QueryChange> queryProducer = KafkaTestUtil.makeProducer(kafka, StringSerializer.class, QueryChangeSerializer.class);
         final Consumer<?, QueryChange>queryConsumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, QueryChangeDeserializer.class);
         final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, changeLogTopic);
-        queryRepo = new InMemoryQueryRepository(changeLog);
-    }
-
-    @After
-    public void cleanup() throws Exception {
-        queryRepo.close();
+        queryRepo = new InMemoryQueryRepository(changeLog, Scheduler.newFixedRateSchedule(0L, 5, TimeUnit.SECONDS));
     }
 
     @Test
diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java
index 1399142..f6ceb75 100644
--- a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java
+++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java
@@ -19,6 +19,7 @@
 package org.apache.rya.streams.client.command;
 
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.producer.Producer;
@@ -34,11 +35,12 @@
 import org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer;
 import org.apache.rya.test.kafka.KafkaTestInstanceRule;
 import org.apache.rya.test.kafka.KafkaTestUtil;
-import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 
+import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
+
 /**
  * integration Test for listing queries through a command.
  */
@@ -60,12 +62,7 @@
         final Producer<?, QueryChange> queryProducer = KafkaTestUtil.makeProducer(kafka, StringSerializer.class, QueryChangeSerializer.class);
         final Consumer<?, QueryChange>queryConsumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, QueryChangeDeserializer.class);
         final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, changeLogTopic);
-        queryRepo = new InMemoryQueryRepository(changeLog);
-    }
-
-    @After
-    public void cleanup() throws Exception {
-        queryRepo.close();
+        queryRepo = new InMemoryQueryRepository(changeLog, Scheduler.newFixedRateSchedule(0L, 5, TimeUnit.SECONDS));
     }
 
     @Test
diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java
index 3389d6b..7e3b8bc 100644
--- a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java
+++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java
@@ -23,6 +23,7 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.producer.Producer;
@@ -56,6 +57,7 @@
 import org.openrdf.query.impl.MapBindingSet;
 
 import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
 
 /**
  * Integration tests the methods of {@link RunQueryCommand}.
@@ -81,7 +83,7 @@
         final Producer<?, QueryChange> queryProducer = KafkaTestUtil.makeProducer(kafka, StringSerializer.class, QueryChangeSerializer.class);
         final Consumer<?, QueryChange>queryConsumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, QueryChangeDeserializer.class);
         final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, changeLogTopic);
-        queryRepo = new InMemoryQueryRepository(changeLog);
+        queryRepo = new InMemoryQueryRepository(changeLog, Scheduler.newFixedRateSchedule(0L, 5, TimeUnit.SECONDS));
 
         // Initialize the Statements Producer and the Results Consumer.
         stmtProducer = KafkaTestUtil.makeProducer(kafka, StringSerializer.class, VisibilityStatementSerializer.class);
@@ -92,7 +94,6 @@
     public void cleanup() throws Exception{
         stmtProducer.close();
         resultConsumer.close();
-        queryRepo.close();
     }
 
     @Test(expected = ExecutionException.class)
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java
index 9a773f0..5dbd27f 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java
@@ -23,6 +23,7 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.producer.Producer;
@@ -52,6 +53,7 @@
 import org.openrdf.query.impl.MapBindingSet;
 
 import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
 
 /**
  * Integration tests the methods of {@link KafkaRunQuery}.
@@ -83,7 +85,7 @@
         final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
 
         // This query is completely in memory, so it doesn't need to be closed.
-        final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog() );
+        final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog(), Scheduler.newFixedRateSchedule(0L, 5, TimeUnit.SECONDS) );
 
         // Add the query to the query repository.
         final StreamsQuery sQuery = queries.add("SELECT * WHERE { ?person <urn:worksAt> ?business . }", true);
