RYA-451 Fixing threading issues with the QueryManager class.
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java
index 7194834..11423bd 100644
--- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java
@@ -85,7 +85,7 @@
         }
         return false;
     }
-    
+
     @Override
     public String toString() {
         final StringBuilder sb = new StringBuilder();
@@ -95,7 +95,7 @@
         sb.append(getSparql() + "\n");
         sb.append("Is ");
         if (!isActive) {
-            sb.append(" Not ");
+            sb.append("Not ");
         }
         sb.append("Running.\n");
         return sb.toString();
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java
index 5fb0297..95c1922 100644
--- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java
@@ -51,9 +51,9 @@
  */
 @DefaultAnnotation(NonNull.class)
 public class InMemoryQueryRepository extends AbstractScheduledService implements QueryRepository {
-    private static final Logger LOG = LoggerFactory.getLogger(InMemoryQueryRepository.class);
+    private static final Logger log = LoggerFactory.getLogger(InMemoryQueryRepository.class);
 
-    private final ReentrantLock lock = new ReentrantLock();
+    private final ReentrantLock lock = new ReentrantLock(true);
 
     /**
      * The change log that is the ground truth for describing what the queries look like.
@@ -198,7 +198,6 @@
 
     @Override
     protected void shutDown() throws Exception {
-        super.shutDown();
         lock.lock();
         try {
             changeLog.close();
@@ -211,11 +210,12 @@
      * Updates the {@link #queriesCache} to reflect the latest position within the {@link #changeLog}.
      */
     private void updateCache() {
-        requireNonNull(changeLog);
+        log.trace("updateCache() - Enter");
 
         CloseableIteration<ChangeLogEntry<QueryChange>, QueryChangeLogException> it = null;
         try {
             // Iterate over everything since the last position that was handled within the change log.
+            log.debug("Starting cache position:" + cachePosition);
             if(cachePosition.isPresent()) {
                 it = changeLog.readFromPosition(cachePosition.get() + 1);
             } else {
@@ -228,6 +228,8 @@
                 final QueryChange change = entry.getEntry();
                 final UUID queryId = change.getQueryId();
 
+                log.debug("Updating the cache to reflect:\n" + change);
+
                 switch(change.getChangeType()) {
                     case CREATE:
                         final StreamsQuery query = new StreamsQuery(
@@ -253,15 +255,17 @@
                         break;
                 }
 
+                log.debug("Notifying listeners with the updated state.");
                 final Optional<StreamsQuery> newQueryState = Optional.ofNullable(queriesCache.get(queryId));
                 listeners.forEach(listener -> listener.notify(entry, newQueryState));
 
                 cachePosition = Optional.of( entry.getPosition() );
+                log.debug("New chache position: " + cachePosition);
             }
 
         } catch (final QueryChangeLogException e) {
             // Rethrow the exception because the object the supplier tried to create could not be created.
-            throw new RuntimeException("Could not initialize the " + InMemoryQueryRepository.class.getName(), e);
+            throw new RuntimeException("Could not update the cache of " + InMemoryQueryRepository.class.getName(), e);
 
         } finally {
             // Try to close the iteration if it was opened.
@@ -270,18 +274,22 @@
                     it.close();
                 }
             } catch (final QueryChangeLogException e) {
-                LOG.error("Could not close the " + CloseableIteration.class.getName(), e);
+                log.error("Could not close the " + CloseableIteration.class.getName(), e);
             }
+
+            log.trace("updateCache() - Exit");
         }
     }
 
     @Override
     protected void runOneIteration() throws Exception {
+        log.trace("runOneIteration() - Enter");
         lock.lock();
         try {
             updateCache();
         } finally {
             lock.unlock();
+            log.trace("runOneIteration() - Exit");
         }
     }
 
@@ -292,17 +300,25 @@
 
     @Override
     public Set<StreamsQuery> subscribe(final QueryChangeLogListener listener) {
+        log.trace("subscribe(listener) - Enter");
+
         //locks to prevent the current state from changing while subscribing.
         lock.lock();
+        log.trace("subscribe(listener) - Acquired lock");
         try {
             listeners.add(listener);
+            log.trace("subscribe(listener) - Listener Registered");
 
             //return the current state of the query repository
-            return queriesCache.values()
+            final Set<StreamsQuery> queries = queriesCache.values()
                     .stream()
                     .collect(Collectors.toSet());
+            log.trace("subscribe(listener) - Returning " + queries.size() + " existing queries");
+            return queries;
         } finally {
+            log.trace("subscribe(listener) - Releasing lock");
             lock.unlock();
+            log.trace("subscribe(listener) - Exit");
         }
     }
 
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChange.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChange.java
index d283957..d34a394 100644
--- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChange.java
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChange.java
@@ -110,6 +110,16 @@
         return false;
     }
 
+    @Override
+    public String toString() {
+        return "QueryChange: {" +
+               "    Query ID: " + queryId + ",\n" +
+               "    Change Type: " + changeType + ",\n" +
+               "    Is Active: " + isActive + ",\n" +
+               "    SPARQL: " + sparql + "\n" +
+               "}";
+    }
+
     /**
      * Create a {@link QueryChange} that represents a new SPARQL query that will be managed by Rya Streams.
      *
diff --git a/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java b/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java
index 3b3d48a..d7e116b 100644
--- a/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java
+++ b/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java
@@ -93,13 +93,9 @@
 
             // Create a new totally in memory QueryRepository.
             final QueryRepository initializedQueries = new InMemoryQueryRepository( changeLog, SCHEDULE );
-            try {
-                // Listing the queries should work using an initialized change log.
-                final Set<StreamsQuery> stored = initializedQueries.list();
-                assertEquals(expected, stored);
-            } finally {
-                queries.stop();
-            }
+            // Listing the queries should work using an initialized change log.
+            final Set<StreamsQuery> stored = initializedQueries.list();
+            assertEquals(expected, stored);
         } finally {
             queries.stop();
         }
@@ -166,7 +162,7 @@
             final StreamsQuery query = queries.add("query 1", true);
 
             final Set<StreamsQuery> existing = queries.subscribe((queryChangeEvent, newQueryState) -> {
-                final ChangeLogEntry<QueryChange> expected = new ChangeLogEntry<QueryChange>(1L,
+                final ChangeLogEntry<QueryChange> expected = new ChangeLogEntry<>(1L,
                         QueryChange.create(queryChangeEvent.getEntry().getQueryId(), "query 2", true));
                 final Optional<StreamsQuery> expectedQueryState = Optional.of(
                         new StreamsQuery(queryChangeEvent.getEntry().getQueryId(), "query 2", true));
@@ -197,7 +193,7 @@
             //show listener on repo that query was added to is being notified of the new query.
             final CountDownLatch repo1Latch = new CountDownLatch(1);
             queries.subscribe((queryChangeEvent, newQueryState) -> {
-                final ChangeLogEntry<QueryChange> expected = new ChangeLogEntry<QueryChange>(0L,
+                final ChangeLogEntry<QueryChange> expected = new ChangeLogEntry<>(0L,
                         QueryChange.create(queryChangeEvent.getEntry().getQueryId(), "query 2", true));
                 final Optional<StreamsQuery> expectedQueryState = Optional.of(
                         new StreamsQuery(queryChangeEvent.getEntry().getQueryId(), "query 2", true));
@@ -210,7 +206,7 @@
             //show listener not on the repo that query was added to is being notified as well.
             final CountDownLatch repo2Latch = new CountDownLatch(1);
             queries2.subscribe((queryChangeEvent, newQueryState) -> {
-                final ChangeLogEntry<QueryChange> expected = new ChangeLogEntry<QueryChange>(0L,
+                final ChangeLogEntry<QueryChange> expected = new ChangeLogEntry<>(0L,
                         QueryChange.create(queryChangeEvent.getEntry().getQueryId(), "query 2", true));
                 final Optional<StreamsQuery> expectedQueryState = Optional.of(
                         new StreamsQuery(queryChangeEvent.getEntry().getQueryId(), "query 2", true));
diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java
index cd78975..a5507a6 100644
--- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java
+++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java
@@ -23,6 +23,7 @@
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.rya.streams.api.entity.StreamsQuery;
 import org.apache.rya.streams.api.exception.RyaStreamsException;
 import org.apache.rya.streams.api.interactor.ListQueries;
@@ -112,12 +113,11 @@
         sb.append("Queries in Rya Streams:\n");
         sb.append("---------------------------------------------------------\n");
         queries.forEach(query -> {
-            sb.append("ID: ");
-            sb.append(query.getQueryId());
-            sb.append("\t\t");
-            sb.append("Query: ");
-            sb.append(query.getSparql());
-            sb.append("\n");
+            sb.append("ID: ").append(query.getQueryId())
+                .append("    ")
+                .append("Is Active: ").append(query.isActive())
+                .append(StringUtils.rightPad("" + query.isActive(), 9))
+                .append("Query: ").append(query.getSparql()).append("\n");
         });
         return sb.toString();
     }
diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java
index ddaf647..7b311f6 100644
--- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java
+++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java
@@ -136,7 +136,7 @@
                 final Set<String> topics = new HashSet<>();
                 topics.add( KafkaTopics.statementsTopic(params.ryaInstance) );
                 topics.add( KafkaTopics.queryResultsTopic(queryId) );
-                KafkaTopics.createTopic(params.zookeeperServers, topics, 1, 1);
+                KafkaTopics.createTopics(params.zookeeperServers, topics, 1, 1);
 
                 // Run the query that uses those topics.
                 final KafkaRunQuery runQuery = new KafkaRunQuery(
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java
index 095465c..989799a 100644
--- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java
@@ -55,7 +55,7 @@
     }
 
     /**
-     * Get the Rya instance name from a Kafka topic name that has been used for a {@link QueryChnageLog}.
+     * Get the Rya instance name from a Kafka topic name that has been used for a {@link QueryChangeLog}.
      * <p/>
      * This is the inverse function of {@link #queryChangeLogTopic(String)}.
      *
@@ -106,7 +106,7 @@
      * @param partitions - The number of partitions that each of the topics will have.
      * @param replicationFactor - The replication factor of the topics that are created.
      */
-    public static void createTopic(
+    public static void createTopics(
             final String zookeeperServers,
             final Set<String> topicNames,
             final int partitions,
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/SingleThreadKafkaStreamsFactory.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/SingleThreadKafkaStreamsFactory.java
index 7ab7e90..8093951 100644
--- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/SingleThreadKafkaStreamsFactory.java
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/SingleThreadKafkaStreamsFactory.java
@@ -83,7 +83,7 @@
         try {
             final TopologyBuilder topologyBuilder = topologyFactory.build(query.getSparql(), statementsTopic, resultsTopic, new RandomUUIDFactory());
             return new KafkaStreams(topologyBuilder, new StreamsConfig(streamsProps));
-        } catch (MalformedQueryException | TopologyBuilderException e) {
+        } catch (final MalformedQueryException | TopologyBuilderException e) {
             throw new KafkaStreamsFactoryException("Could not create a KafkaStreams processing topology for query " + query.getQueryId(), e);
         }
     }
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/CreateKafkaTopic.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/CreateKafkaTopic.java
new file mode 100644
index 0000000..771e1c8
--- /dev/null
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/CreateKafkaTopic.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.interactor;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Set;
+
+import org.apache.rya.streams.kafka.KafkaTopics;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Creates topics in Kafka.
+ */
+@DefaultAnnotation(NonNull.class)
+public class CreateKafkaTopic {
+
+    private final String zookeeperServers;
+
+    /**
+     * Constructs an instance of {@link CreateKafkaTopic}.
+     *
+     * @param zookeeperServers - The Zookeeper servers that are used to manage the Kafka instance. (not null)
+     */
+    public CreateKafkaTopic(final String zookeeperServers) {
+        this.zookeeperServers = requireNonNull(zookeeperServers);
+    }
+
+    /**
+     * Creates a set of Kafka topics for each topic that does not already exist.
+     *
+     * @param topicNames - The topics that will be created. (not null)
+     * @param partitions - The number of partitions that each of the topics will have.
+     * @param replicationFactor - The replication factor of the topics that are created.
+     */
+    public void createTopics(
+            final Set<String> topicNames,
+            final int partitions,
+            final int replicationFactor) {
+        KafkaTopics.createTopics(zookeeperServers, topicNames, partitions, replicationFactor);
+    }
+}
\ No newline at end of file
diff --git a/extras/rya.streams/query-manager/src/main/README.txt b/extras/rya.streams/query-manager/src/main/README.txt
index 3b2dbfe..93d5ac5 100644
--- a/extras/rya.streams/query-manager/src/main/README.txt
+++ b/extras/rya.streams/query-manager/src/main/README.txt
@@ -41,8 +41,11 @@
     yum install -y ${project.artifactId}-${project.version}.noarch.rpm
     
  3. Update the configuration file:
-    Replace "[Kafka Broker Hostname]" with the IP address of the Kafka broker.
-    Replace the Kafka port if using something other than the default of 9092.
+    A. Replace "[Zookeepers used to manage Kafka. E.g.: zoo1,zoo2,zoo3]" with 
+       the zookeepers used to manage the Kafka cluster. It is a comma separated 
+       list.
+    B. Replace "[Kafka Broker Hostname]" with the IP address of the Kafka broker.
+    C. Replace the Kafka port if using something other than the default of 9092.
     
  4. Start the service:
     systemctl start rya-streams-query-manager.service
diff --git a/extras/rya.streams/query-manager/src/main/config/configuration.xml b/extras/rya.streams/query-manager/src/main/config/configuration.xml
index 96da501..7077125 100644
--- a/extras/rya.streams/query-manager/src/main/config/configuration.xml
+++ b/extras/rya.streams/query-manager/src/main/config/configuration.xml
@@ -18,7 +18,7 @@
 under the License.
 -->
 <queryManagerConfig>
-    <!-- The  Query Change Log Sources. The source defines a system where Rya
+    <!-- The Query Change Log Sources. The source defines a system where Rya
        - Streams Query Change Logs are managed. The query manager will manage 
        - queries for all Rya instances whose change logs are stored within the
        - source.
@@ -29,6 +29,14 @@
             <port>9092</port>
         </kafka>
     </queryChangeLogSource>
+
+    <!-- The Query Executor. The executor defines a system for executing the
+         Rya Streams queries. -->    
+    <queryExecutor>
+        <localKafkaStreams>
+            <zookeepers>[Zookeepers used to manage Kafka. E.g.: zoo1,zoo2,zoo3]</zookeepers>
+        </localKafkaStreams>
+    </queryExecutor>
     
     <!-- This section defines performance related tuning values. Sensible
        - default have been provided to simplify configuration. 
diff --git a/extras/rya.streams/query-manager/src/main/config/log4j.xml b/extras/rya.streams/query-manager/src/main/config/log4j.xml
index 2021638..96ea56c 100644
--- a/extras/rya.streams/query-manager/src/main/config/log4j.xml
+++ b/extras/rya.streams/query-manager/src/main/config/log4j.xml
@@ -28,6 +28,23 @@
         </layout>
     </appender>
 
+    <!-- Kafka configuration configs are loud. -->
+    <logger name="org.apache.kafka.streams.StreamsConfig">
+        <level value="OFF"/>
+    </logger>
+    <logger name="org.apache.kafka.clients.consumer.ConsumerConfig">
+        <level value="OFF"/>
+    </logger>
+    <logger name="org.apache.kafka.clients.producer.ProducerConfig">
+        <level value="OFF"/>
+    </logger>
+
+    <!-- Change this level to DEBUG to see more information about what the 
+         QueryManager is doing. -->
+    <logger name="org.apache.rya.streams.querymanager.QueryManager">
+        <level value="INFO"/>
+    </logger>
+
     <root>
         <level value="INFO" />
         <appender-ref ref="console" />
diff --git a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryChangeLogSource.java b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryChangeLogSource.java
index 73e5d12..eb5ca08 100644
--- a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryChangeLogSource.java
+++ b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryChangeLogSource.java
@@ -51,8 +51,9 @@
     public void unsubscribe(final SourceListener listener);
 
     /**
-     * A listener that is notified when a {@link QueryChangeLog} has
-     * been added or removed from a {@link QueryChangeLogSource}.
+     * A listener that is notified when a {@link QueryChangeLog} has been added or
+     * removed from a {@link QueryChangeLogSource}. The listener receives the only
+     * copy of the change log and is responsible for shutting it down.
      */
     @DefaultAnnotation(NonNull.class)
     public interface SourceListener {
diff --git a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java
index 30b4538..e6bd800 100644
--- a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java
+++ b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java
@@ -1,18 +1,20 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 package org.apache.rya.streams.querymanager;
 
@@ -20,16 +22,23 @@
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.rya.streams.api.entity.StreamsQuery;
 import org.apache.rya.streams.api.queries.ChangeLogEntry;
 import org.apache.rya.streams.api.queries.InMemoryQueryRepository;
 import org.apache.rya.streams.api.queries.QueryChange;
-import org.apache.rya.streams.api.queries.QueryChange.ChangeType;
 import org.apache.rya.streams.api.queries.QueryChangeLog;
 import org.apache.rya.streams.api.queries.QueryChangeLogListener;
 import org.apache.rya.streams.api.queries.QueryRepository;
@@ -38,8 +47,10 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.util.concurrent.AbstractIdleService;
+import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
+import com.google.common.util.concurrent.AbstractService;
+import com.google.common.util.concurrent.UncheckedExecutionException;
 
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
 import edu.umd.cs.findbugs.annotations.NonNull;
@@ -51,205 +62,823 @@
  * instances/rya streams instances.
  */
 @DefaultAnnotation(NonNull.class)
-public class QueryManager extends AbstractIdleService {
-    private static final Logger LOG = LoggerFactory.getLogger(QueryManager.class);
-
-    private final QueryExecutor queryExecutor;
-    private final Scheduler scheduler;
+public class QueryManager extends AbstractService {
+    private static final Logger log = LoggerFactory.getLogger(QueryManager.class);
 
     /**
-     * Map of Rya Instance name to {@link QueryRepository}.
+     * The source of {@link QueryChangeLog}s. Each log discovered is bound to a specific
+     * Rya instnace.
      */
-    private final Map<String, QueryRepository> queryRepos = new HashMap<>();
+    private final QueryChangeLogSource changeLogSource;
 
-    private final ReentrantLock lock = new ReentrantLock();
+    /**
+     * The engine that is responsible for executing {@link StreamsQuery}s.
+     */
+    private final QueryExecutor queryExecutor;
 
-    private final QueryChangeLogSource source;
+    /**
+     * How long blocking operations will be attempted before potentially trying again.
+     */
+    private final long blockingValue;
+
+    /**
+     * The units for {@link #blockingValue}.
+     */
+    private final TimeUnit blockingUnits;
+
+    /**
+     * Used to inform threads that the application is shutting down, so they must stop work.
+     */
+    private final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+    /**
+     * This thread pool manages the two thread used to work the {@link LogEvent}s
+     * and the {@link QueryEvent}s.
+     */
+    private final ExecutorService executor = Executors.newFixedThreadPool(2);
 
     /**
      * Creates a new {@link QueryManager}.
      *
      * @param queryExecutor - Runs the active {@link StreamsQuery}s. (not null)
      * @param source - The {@link QueryChangeLogSource} of QueryChangeLogs. (not null)
-     * @param scheduler - The {@link Scheduler} used to discover query changes
-     *      within the {@link QueryChangeLog}s (not null)
+     * @param blockingValue - How long blocking operations will try before looping. (> 0)
+     * @param blockingUnits - The units of the {@code blockingValue}. (not null)
      */
-    public QueryManager(final QueryExecutor queryExecutor, final QueryChangeLogSource source, final Scheduler scheduler) {
-        this.source = requireNonNull(source);
+    public QueryManager(
+            final QueryExecutor queryExecutor,
+            final QueryChangeLogSource source,
+            final long blockingValue,
+            final TimeUnit blockingUnits) {
+        this.changeLogSource = requireNonNull(source);
         this.queryExecutor = requireNonNull(queryExecutor);
-        this.scheduler = requireNonNull(scheduler);
-    }
-
-    /**
-     * Starts running a query.
-     *
-     * @param ryaInstanceName - The Rya instance the query belongs to. (not null)
-     * @param query - The query to run.(not null)
-     */
-    private void runQuery(final String ryaInstanceName, final StreamsQuery query) {
-        requireNonNull(ryaInstanceName);
-        requireNonNull(query);
-        LOG.info("Starting Query: " + query.toString() + " on the rya instance: " + ryaInstanceName);
-
-        try {
-            queryExecutor.startQuery(ryaInstanceName, query);
-        } catch (final QueryExecutorException e) {
-            LOG.error("Failed to start query.", e);
-        }
-    }
-
-    /**
-     * Stops the specified query from running.
-     *
-     * @param queryId - The ID of the query to stop running. (not null)
-     */
-    private void stopQuery(final UUID queryId) {
-        requireNonNull(queryId);
-
-        LOG.info("Stopping query: " + queryId.toString());
-
-        try {
-            queryExecutor.stopQuery(queryId);
-        } catch (final QueryExecutorException e) {
-            LOG.error("Failed to stop query.", e);
-        }
+        Preconditions.checkArgument(blockingValue > 0, "The blocking value must be > 0. Was: " + blockingValue);
+        this.blockingValue = blockingValue;
+        this.blockingUnits = requireNonNull(blockingUnits);
     }
 
     @Override
-    protected void startUp() throws Exception {
-        lock.lock();
+    protected void doStart() {
+        log.info("Starting a QueryManager.");
+
+        // A work queue of discovered Query Change Logs that need to be handled.
+        // This queue exists so that the source notifying thread may be released
+        // immediately instead of calling into blocking functions.
+        final BlockingQueue<LogEvent> logEvents = new ArrayBlockingQueue<>(1024);
+
+        // A work queue of discovered Query Changes from the monitored Query Change Logs
+        // that need to be handled. This queue exists so that the Query Repository notifying
+        // thread may be released immediately instead of calling into blocking functions.
+        final BlockingQueue<QueryEvent> queryEvents = new ArrayBlockingQueue<>(1024);
+
         try {
-            LOG.info("Starting Query Manager.");
+            // Start up a LogEventWorker using the executor service.
+            executor.submit(new LogEventWorker(logEvents, queryEvents, blockingValue, blockingUnits, shutdownSignal));
+
+            // Start up a QueryEvent Worker using the executor service.
+            executor.submit(new QueryEventWorker(queryEvents, queryExecutor, blockingValue, blockingUnits, shutdownSignal));
+
+            // Start up the query execution framework.
             queryExecutor.startAndWait();
-            source.startAndWait();
 
-            // subscribe to the sources to be notified of changes.
-            source.subscribe(new QueryManagerSourceListener());
-        } finally {
-            lock.unlock();
+            // Startup the source that discovers new Query Change Logs.
+            changeLogSource.startAndWait();
+
+            // Subscribe the source a listener that writes to the LogEventWorker's work queue.
+            changeLogSource.subscribe(new LogEventWorkGenerator(logEvents, blockingValue, blockingUnits, shutdownSignal));
+        } catch(final RejectedExecutionException | UncheckedExecutionException e) {
+            log.error("Could not start up a QueryManager.", e);
+            notifyFailed(e);
         }
+
+        // Notify the service was successfully started.
+        notifyStarted();
+
+        log.info("QueryManager has finished starting.");
     }
 
     @Override
-    protected void shutDown() throws Exception {
-        lock.lock();
+    protected void doStop() {
+        log.info("Stopping a QueryManager.");
+
+        // Set the shutdown flag so that all components that rely on that signal will stop processing.
+        shutdownSignal.set(true);
+
+        // Stop the workers and wait for them to die.
+        executor.shutdownNow();
         try {
-            LOG.info("Stopping Query Manager.");
-            source.stopAndWait();
-            queryExecutor.stopAndWait();
-        } finally {
-            lock.unlock();
+            if(!executor.awaitTermination(10, TimeUnit.SECONDS)) {
+                log.warn("Waited 10 seconds for the worker threads to die, but they are still running.");
+            }
+        } catch (final InterruptedException e) {
+            log.warn("Waited 10 seconds for the worker threads to die, but they are still running.");
         }
+
+        // Stop the source of new Change Logs.
+        try {
+            changeLogSource.stopAndWait();
+        } catch(final UncheckedExecutionException e) {
+            log.warn("Could not stop the Change Log Source.", e);
+        }
+
+        // Stop the query execution framework.
+        try {
+            queryExecutor.stopAndWait();
+        } catch(final UncheckedExecutionException e) {
+            log.warn("Could not stop the Query Executor", e);
+        }
+
+        // Notify the service was successfully stopped.
+        notifyStopped();
+
+        log.info("QueryManager has finished stopping.");
     }
 
     /**
-     * An implementation of {@link QueryChangeLogListener} for the
-     * {@link QueryManager}.
-     * <p>
-     * When notified of a {@link ChangeType} performs one of the following:
-     * <li>{@link ChangeType#CREATE}: Creates a new query using the
-     * {@link QueryExecutor} provided to the {@link QueryManager}</li>
-     * <li>{@link ChangeType#DELETE}: Deletes a running query by stopping the
-     * {@link QueryExecutor} service of the queryID in the event</li>
-     * <li>{@link ChangeType#UPDATE}: If the query is running and the update is
-     * to stop the query, stops the query. Otherwise, if the query is not
-     * running, it is removed.</li>
+     * Offer a unit of work to a blocking queue until it is either accepted, or the
+     * shutdown signal is set.
+     *
+     * @param workQueue - The blocking work queue to write to. (not null)
+     * @param event - The event that will be offered to the work queue. (not null)
+     * @param offerValue - How long to wait when offering new work.
+     * @param offerUnits - The unit for the {@code offerValue}. (not null)
+     * @param shutdownSignal - Used to signal application shutdown has started, so
+     *   this method may terminate without ever placing the event on the queue. (not null)
+     * @return {@code true} if the evet nwas added to the queue, otherwise false.
      */
-    private class QueryExecutionForwardingListener implements QueryChangeLogListener {
-        private final String ryaInstanceName;
+    private static <T> boolean offerUntilAcceptedOrShutdown(
+            final BlockingQueue<T> workQueue,
+            final T event,
+            final long offerValue,
+            final TimeUnit offerUnits,
+            final AtomicBoolean shutdownSignal) {
+        requireNonNull(workQueue);
+        requireNonNull(event);
+        requireNonNull(shutdownSignal);
+
+        boolean submitted = false;
+        while(!submitted && !shutdownSignal.get()) {
+            try {
+                submitted = workQueue.offer(event, offerValue, offerUnits);
+                if(!submitted) {
+                    log.debug("An event could not be added to a work queue after waiting 5 seconds. Trying again...");
+                }
+            } catch (final InterruptedException e) {
+                log.debug("An event could not be added to a work queue after waiting 5 seconds. Trying again...");
+            }
+        }
+        return submitted;
+    }
+
+    /**
+     * An observation that a {@link QueryChangeLog} was created within or
+     * removed from a {@link QueryChangeLogSource}.
+     */
+    @DefaultAnnotation(NonNull.class)
+    static class LogEvent {
 
         /**
-         * Creates a new {@link QueryExecutionForwardingListener}.
-         *
-         * @param ryaInstanceName - The rya instance the query change is
-         *        performed on. (not null)
+         * The types of events that may be observed.
          */
-        public QueryExecutionForwardingListener(final String ryaInstanceName) {
-            this.ryaInstanceName = requireNonNull(ryaInstanceName);
+        static enum LogEventType {
+            /**
+             * A {@link QueryChangeLog} was created within a {@link QueryChangeLogSource}.
+             */
+            CREATE,
+
+            /**
+             * A {@link QueryChangeLog} was deleted from a {@link QueryChangeLogSource}.
+             */
+            DELETE;
+        }
+
+        private final String ryaInstance;
+        private final LogEventType eventType;
+        private final Optional<QueryChangeLog> log;
+
+        /**
+         * Constructs an instance of {@link LogEvent}.
+         *
+         * @param ryaInstance - The Rya Instance the log is/was for. (not null)
+         * @param eventType - The type of event that was observed. (not null)
+         * @param log - The log if this is a create event. (not null)
+         */
+        private LogEvent(final String ryaInstance, final LogEventType eventType, final Optional<QueryChangeLog> log) {
+            this.ryaInstance = requireNonNull(ryaInstance);
+            this.eventType = requireNonNull(eventType);
+            this.log = requireNonNull(log);
+        }
+
+        /**
+         * @return The Rya Instance whose log was either created or deleted.
+         */
+        public String getRyaInstanceName() {
+            return ryaInstance;
+        }
+
+        /**
+         * @return The type of event that was observed.
+         */
+        public LogEventType getEventType() {
+            return eventType;
+        }
+
+        /**
+         * @return The {@link QueryChangeLog} if this is a CREATE event.
+         */
+        public Optional<QueryChangeLog> getQueryChangeLog() {
+            return log;
         }
 
         @Override
-        public void notify(final ChangeLogEntry<QueryChange> queryChangeEvent, final Optional<StreamsQuery> newQueryState) {
-            LOG.debug("New query change event.");
-            final QueryChange entry = queryChangeEvent.getEntry();
+        public String toString() {
+            return "LogEvent {\n" +
+                   "    Rya Instance: " + ryaInstance + ",\n" +
+                   "    Event Type: " + eventType + "\n" +
+                   "}";
+        }
 
-            lock.lock();
-            try {
+        /**
+         * Make a {@link LogEvent} that indicates a {@link QueryChangeLog} was created within a
+         * {@link QueryChangeLogSource}.
+         *
+         * @param ryaInstance - The Rya Instance the created log is for. (not null)
+         * @param log - The created {@link QueryChangeLog. (not null)
+         * @return A {@link LogEvent} built using the provided values.
+         */
+        public static LogEvent create(final String ryaInstance, final QueryChangeLog log) {
+            return new LogEvent(ryaInstance, LogEventType.CREATE, Optional.of(log));
+        }
 
-                switch (entry.getChangeType()) {
-                    case CREATE:
-                        if(!newQueryState.isPresent()) {
-                            LOG.error("The query with ID: " + entry.getQueryId() + " must be present with the change to be created.");
-                            LOG.debug("newQueryState is not allowed to be absent with a CREATE QueryChange, there might be a bug in the QueryRepository.");
-                        } else {
-                            runQuery(ryaInstanceName, newQueryState.get());
-                        }
-                        break;
-                    case DELETE:
-                        stopQuery(entry.getQueryId());
-                        break;
-                    case UPDATE:
-                        if (!newQueryState.isPresent()) {
-                            LOG.error("The query with ID: " + entry.getQueryId() + " must be provided with the update, cannot perform update.");
-                            LOG.debug("newQueryState is not allowed to be absent with a UPDATE QueryChange, there might be a bug in the QueryRepository.");
-                        } else {
-                            final StreamsQuery updatedQuery = newQueryState.get();
-                            if (updatedQuery.isActive()) {
-                                runQuery(ryaInstanceName, updatedQuery);
-                                LOG.info("Starting query: " + updatedQuery.toString());
-                            } else {
-                                stopQuery(updatedQuery.getQueryId());
-                                LOG.info("Stopping query: " + updatedQuery.toString());
-                            }
-                        }
-                        break;
-                }
-            } finally {
-                lock.unlock();
-            }
+        /**
+         * Make a {@link LogEvent} that indicates a {@link QueryChangeLog} was deleted from
+         * a {@link QueryChangeLogSource}.
+         *
+         * @param ryaInstance - The Rya Instance whose log was deleted. (not null)
+         * @return A {@link LogEvent} built using the provided values.
+         */
+        public static LogEvent delete(final String ryaInstance) {
+            return new LogEvent(ryaInstance, LogEventType.DELETE, Optional.empty());
         }
     }
 
     /**
-     * Listener used by the {@link QueryManager} to be notified when
-     * {@link QueryChangeLog}s are created or deleted.
+     * An observation that a {@link StreamsQuery} needs to be executing or not
+     * via the provided {@link QueryExecutor}.
      */
-    private class QueryManagerSourceListener implements SourceListener {
+    @DefaultAnnotation(NonNull.class)
+    static class QueryEvent {
+
+        /**
+         * The type of events that may be observed.
+         */
+        public static enum QueryEventType {
+            /**
+             * Indicates a {@link StreamsQuery} needs to be executing.
+             */
+            EXECUTING,
+
+            /**
+             * Indicates a {@link StreamsQuery} needs to be stopped.
+             */
+            STOPPED,
+
+            /**
+             * Indicates all {@link StreamsQuery}s for a Rya instance need to be stopped.
+             */
+            STOP_ALL;
+        }
+
+        private final String ryaInstance;
+        private final QueryEventType type;
+        private final Optional<UUID> queryId;
+        private final Optional<StreamsQuery> query;
+
+        /**
+         * Constructs an instance of {@link QueryEvent}.
+         *
+         * @param ryaInstance - The Rya instance that generated the event. (not null)
+         * @param type - Indicates whether the query needs to be executing or not. (not null)
+         * @param queryId - If stopped, the ID of the query that must not be running. (not null)
+         * @param query - If executing, the StreamsQuery that defines what should be executing. (not null)
+         */
+        private QueryEvent(
+                final String ryaInstance,
+                final QueryEventType type,
+                final Optional<UUID> queryId,
+                final Optional<StreamsQuery> query) {
+            this.ryaInstance = requireNonNull(ryaInstance);
+            this.type = requireNonNull(type);
+            this.queryId = requireNonNull(queryId);
+            this.query = requireNonNull(query);
+        }
+
+        /**
+         * @return The Rya instance that generated the event.
+         */
+        public String getRyaInstance() {
+            return ryaInstance;
+        }
+
+        /**
+         * @return Indicates whether the query needs to be executing or not.
+         */
+        public QueryEventType getType() {
+            return type;
+        }
+
+        /**
+         * @return If stopped, the ID of the query that must not be running. Otherwise absent.
+         */
+        public Optional<UUID> getQueryId() {
+            return queryId;
+        }
+
+        /**
+         * @return If executing, the StreamsQuery that defines what should be executing. Otherwise absent.
+         */
+        public Optional<StreamsQuery> getStreamsQuery() {
+            return query;
+        }
+
         @Override
-        public void notifyCreate(final String ryaInstanceName, final QueryChangeLog log) {
-            lock.lock();
-            try {
-                LOG.info("Discovered new Query Change Log for Rya Instance " + ryaInstanceName + ".");
-                final QueryRepository repo = new InMemoryQueryRepository(log, scheduler);
-                repo.startAndWait();
-                final Set<StreamsQuery> queries = repo.subscribe(new QueryExecutionForwardingListener(ryaInstanceName));
-                queries.forEach(query -> {
-                    if (query.isActive()) {
-                        try {
-                            queryExecutor.startQuery(ryaInstanceName, query);
-                        } catch (IllegalStateException | QueryExecutorException e) {
-                            LOG.error("Unable to start query for rya instance " + ryaInstanceName, e);
-                        }
-                    }
-                });
-                queryRepos.put(ryaInstanceName, repo);
-            } finally {
-                lock.unlock();
+        public int hashCode() {
+            return Objects.hash(ryaInstance, type, queryId, query);
+        }
+
+        @Override
+        public boolean equals(final Object o) {
+            if(o instanceof QueryEvent) {
+                final QueryEvent other = (QueryEvent) o;
+                return Objects.equals(ryaInstance, other.ryaInstance) &&
+                        Objects.equals(type, other.type) &&
+                        Objects.equals(queryId, other.queryId) &&
+                        Objects.equals(query, other.query);
             }
+            return false;
+        }
+
+        @Override
+        public String toString() {
+            final StringBuilder string = new StringBuilder();
+            string.append("Query Event {\n")
+                  .append("    Rya Instance: ").append(ryaInstance).append(",\n")
+                  .append("    Type: ").append(type).append(",\n");
+            switch(type) {
+                case EXECUTING:
+                    append(string, query.get());
+                    break;
+                case STOPPED:
+                    string.append("    Query ID: ").append(queryId.get()).append("\n");
+                    break;
+                case STOP_ALL:
+                    break;
+                default:
+                    // Default to showing everything that is in the object.
+                    string.append("    Query ID: ").append(queryId.get()).append("\n");
+                    append(string, query.get());
+                    break;
+            }
+            string.append("}");
+            return string.toString();
+        }
+
+        private void append(final StringBuilder string, final StreamsQuery query) {
+            requireNonNull(string);
+            requireNonNull(query);
+            string.append("    Streams Query {\n")
+                  .append("        Query ID: ").append(query.getQueryId()).append(",\n")
+                  .append("        Is Active: ").append(query.isActive()).append(",\n")
+                  .append("        SPARQL: ").append(query.getSparql()).append("\n")
+                  .append("    }");
+        }
+
+        /**
+         * Create a {@link QueryEvent} that indicates a query needs to be executing.
+         *
+         * @param ryaInstance - The Rya instance that generated the event. (not null)
+         * @param query - The StreamsQuery that defines what should be executing. (not null)
+         * @return A {@link QueryEvent} built using the provided values.
+         */
+        public static QueryEvent executing(final String ryaInstance, final StreamsQuery query) {
+            return new QueryEvent(ryaInstance, QueryEventType.EXECUTING, Optional.empty(), Optional.of(query));
+        }
+
+        /**
+         * Create a {@link QueryEvent} that indicates a query needs to be stopped.
+         *
+         * @param ryaInstance - The Rya instance that generated the event. (not null)
+         * @param queryId - The ID of the query that must not be running. (not null)
+         * @return A {@link QueryEvent} built using the provided values.
+         */
+        public static QueryEvent stopped(final String ryaInstance, final UUID queryId) {
+            return new QueryEvent(ryaInstance, QueryEventType.STOPPED, Optional.of(queryId), Optional.empty());
+        }
+
+        /**
+         * Create a {@link QueryEvent} that indicates all queries for a Rya instance needs to be stopped.
+         *
+         * @param ryaInstance - The Rya instance that generated the event. (not null)
+         * @return A {@link QueryEvent} built using the provided values.
+         */
+        public static QueryEvent stopALL(final String ryaInstance) {
+            return new QueryEvent(ryaInstance, QueryEventType.STOP_ALL, Optional.empty(), Optional.empty());
+        }
+    }
+
+    /**
+     * Listens to a {@link QueryChangeLogSource} and adds observations to the provided
+     * work queue. It does so until the provided shutdown signal is set.
+     */
+    @DefaultAnnotation(NonNull.class)
+    static class LogEventWorkGenerator implements SourceListener {
+
+        private final BlockingQueue<LogEvent> workQueue;
+        private final AtomicBoolean shutdownSignal;
+        private final long offerValue;
+        private final TimeUnit offerUnits;
+
+        /**
+         * Constructs an instance of {@link QueryManagerSourceListener}.
+         *
+         * @param workQueue - A blocking queue that will have {@link LogEvent}s offered to it. (not null)
+         * @param offerValue - How long to wait when offering new work.
+         * @param offerUnits - The unit for the {@code offerValue}. (not null)
+         * @param shutdownSignal - Indicates to this listener that it needs to stop adding events
+         *   to the work queue because the application is shutting down. (not null)
+         */
+        public LogEventWorkGenerator(
+                final BlockingQueue<LogEvent> workQueue,
+                final long offerValue,
+                final TimeUnit offerUnits,
+                final AtomicBoolean shutdownSignal) {
+            this.workQueue = requireNonNull(workQueue);
+            this.shutdownSignal = requireNonNull(shutdownSignal);
+            this.offerValue = offerValue;
+            this.offerUnits = requireNonNull(offerUnits);
+        }
+
+        @Override
+        public void notifyCreate(final String ryaInstanceName, final QueryChangeLog changeLog) {
+            log.info("A new Query Change Log has been discovered for Rya Instance " + ryaInstanceName + ". All " +
+                    "queries that are set to active within it will be started.");
+
+            // Create an event that summarizes this notification.
+            final LogEvent event = LogEvent.create(ryaInstanceName, changeLog);
+
+            // Offer it to the worker until there is room for it in the work queue, or we are shutting down.
+            offerUntilAcceptedOrShutdown(workQueue, event, offerValue, offerUnits, shutdownSignal);
         }
 
         @Override
         public void notifyDelete(final String ryaInstanceName) {
-            lock.lock();
+            log.info("The Query Change Log for Rya Instance " + ryaInstanceName + " has been deleted. All of the " +
+                    "queries related to that instance will be stopped.");
+
+            // Create an event that summarizes this notification.
+            final LogEvent event = LogEvent.delete(ryaInstanceName);
+
+            // Offer it to the worker until there is room for it in the work queue, or we are shutting down.
+            offerUntilAcceptedOrShutdown(workQueue, event, offerValue, offerUnits, shutdownSignal);
+        }
+    }
+
+    /**
+     * Processes a work queue of {@link LogEvent}s.
+     * <p/>
+     * Whenever a new log has been created, then it registers a {@link QueryEventWorkGenerator}
+     * that generates {@link QueryEvent}s based on the content and updates to the discovered
+     * {@link QueryChagneLog}.
+     * <p/>
+     * Whenever a log is deleted, then the generator is stopped and a stop all {@link QueryEvent}
+     * is written to the work queue.
+     */
+    @DefaultAnnotation(NonNull.class)
+    static class LogEventWorker implements Runnable {
+
+        /**
+         * A map of Rya Instance name to he Query Repository for that instance.
+         */
+        private final Map<String, QueryRepository> repos = new HashMap<>();
+
+        private final BlockingQueue<LogEvent> logWorkQueue;
+        private final BlockingQueue<QueryEvent> queryWorkQueue;
+        private final long blockingValue;
+        private final TimeUnit blockingUnits;
+        private final AtomicBoolean shutdownSignal;
+
+        /**
+         * Constructs an instance of {@link LogEventWorker}.
+         *
+         * @param logWorkQueue - A queue of {@link LogEvent}s that will be worked by this object. (not null)
+         * @param queryWorkQueue - A queue where {@link QueryEvent}s will be placed by this object. (not null)
+         * @param blockingValue - How long to wait when polling/offering new work.
+         * @param blockingUnits - The unit for the {@code blockingValue}. (not null)
+         * @param shutdownSignal - Indicates when the application has been shutdown, so the executing thread
+         *   may exit the {@link #run()} method. (not null)
+         */
+        public LogEventWorker(
+                final BlockingQueue<LogEvent> logWorkQueue,
+                final BlockingQueue<QueryEvent> queryWorkQueue,
+                final long blockingValue,
+                final TimeUnit blockingUnits,
+                final AtomicBoolean shutdownSignal) {
+            this.logWorkQueue = requireNonNull(logWorkQueue);
+            this.queryWorkQueue = requireNonNull(queryWorkQueue);
+            this.blockingValue = blockingValue;
+            this.blockingUnits = requireNonNull(blockingUnits);
+            this.shutdownSignal = requireNonNull(shutdownSignal);
+        }
+
+        @Override
+        public void run() {
+            // Run until the shutdown signal is set.
+            while(!shutdownSignal.get()) {
+                try {
+                    // Pull a unit of work from the queue.
+                    log.debug("LogEventWorker - Polling the work queue for a new LogEvent.");
+                    final LogEvent logEvent = logWorkQueue.poll(blockingValue, blockingUnits);
+                    if(logEvent == null) {
+                        // Poll again if nothing was found.
+                        continue;
+                    }
+
+                    log.info("LogEventWorker - handling: \n" + logEvent);
+                    final String ryaInstance = logEvent.getRyaInstanceName();
+
+                    switch(logEvent.getEventType()) {
+                        case CREATE:
+                            // If we see a create message for a Rya Instance we are already maintaining,
+                            // then don't do anything.
+                            if(repos.containsKey(ryaInstance)) {
+                                log.warn("LogEventWorker - A repository is already being managed for the Rya Instance " +
+                                        ryaInstance + ". This message will be ignored.");
+                                continue;
+                            }
+
+                            // Create and start a QueryRepository for the discovered log. Hold onto the repository
+                            // so that it may be shutdown later.
+                            final Scheduler scheduler = Scheduler.newFixedRateSchedule(0, blockingValue, blockingUnits);
+                            final QueryRepository repo = new InMemoryQueryRepository(logEvent.getQueryChangeLog().get(), scheduler);
+                            repo.startAndWait();
+                            repos.put(ryaInstance, repo);
+
+                            // Subscribe a worker that adds the Query Events to the queryWorkQueue queue.
+                            // A count down latch is used to ensure the returned set of queries are handled
+                            // prior to any notifications from the repository.
+                            final CountDownLatch subscriptionWorkFinished = new CountDownLatch(1);
+                            final QueryEventWorkGenerator queryWorkGenerator =
+                                    new QueryEventWorkGenerator(ryaInstance, subscriptionWorkFinished, queryWorkQueue,
+                                            blockingValue, blockingUnits, shutdownSignal);
+
+                            log.debug("LogEventWorker - Setting up a QueryWorkGenerator...");
+                            final Set<StreamsQuery> queries = repo.subscribe(queryWorkGenerator);
+                            log.debug("LogEventWorker - Finished setting up a QueryWorkGenerator.");
+
+                            // Handle the view of the queries within the repository as it existed when
+                            // the subscription was registered.
+                            queries.stream()
+                            .forEach(query -> {
+                                // Create a QueryEvent that represents the active state of the existing query.
+                                final QueryEvent queryEvent = query.isActive() ?
+                                        QueryEvent.executing(ryaInstance, query) : QueryEvent.stopped(ryaInstance, query.getQueryId());
+                                log.debug("LogEventWorker - offering: " + queryEvent);
+
+                                // Offer it to the worker until there is room for it in the work queue, or we are shutting down.
+                                offerUntilAcceptedOrShutdown(queryWorkQueue, queryEvent, blockingValue, blockingUnits, shutdownSignal);
+                            });
+
+                            // Indicate the subscription work is finished so that the registered listener may start
+                            // adding work to the queue.
+                            log.info("LogEventWorker - Counting down the subscription work latch.");
+                            subscriptionWorkFinished.countDown();
+                            break;
+
+                        case DELETE:
+                            if(repos.containsKey(ryaInstance)) {
+                                // Shut down the query repository for the Rya instance. This ensures the listener will
+                                // not receive any more work that needs to be done.
+                                final QueryRepository deletedRepo = repos.remove(ryaInstance);
+                                deletedRepo.stopAndWait();
+
+                                // Add work that stops all of the queries related to the instance.
+                                final QueryEvent stopAllEvent = QueryEvent.stopALL(ryaInstance);
+                                offerUntilAcceptedOrShutdown(queryWorkQueue, stopAllEvent, blockingValue, blockingUnits, shutdownSignal);
+                            }
+                            break;
+                    }
+                } catch (final InterruptedException e) {
+                    log.debug("LogEventWorker did not see any new events over the past 5 seconds. Polling again...");
+                }
+            }
+
+            log.info("LogEventWorker shutting down...");
+
+            // Shutdown all of the QueryRepositories that were started.
+            repos.values().forEach(repo -> repo.stopAndWait());
+
+            log.info("LogEventWorker shut down.");
+        }
+    }
+
+    /**
+     * Listens to a {@link QueryRepository} and adds observations to the provided work queue.
+     * It does so until the provided shutdown signal is set.
+     */
+    @DefaultAnnotation(NonNull.class)
+    static class QueryEventWorkGenerator implements QueryChangeLogListener {
+
+        private final String ryaInstance;
+        private final CountDownLatch subscriptionWorkFinished;
+        private final BlockingQueue<QueryEvent> queryWorkQueue;
+        private final long blockingValue;
+        private final TimeUnit blockingUnits;
+        private final AtomicBoolean shutdownSignal;
+
+        /**
+         * Constructs an instance of {@link QueryEventWorkGenerator}.
+         *
+         * @param ryaInstance - The rya instance whose log this objects is watching. (not null)
+         * @param subscriptionWorkFinished - Indicates when work that needs to be completed before this
+         *   listener handles notifications is completed. (not null)
+         * @param queryWorkQueue - A queue where {@link QueryEvent}s will be placed by this object. (not null)
+         * @param blockingValue - How long to wait when polling/offering new work.
+         * @param blockingUnits - The unit for the {@code blockingValue}. (not null)
+         * @param shutdownSignal - Indicates to this listener that it needs to stop adding events
+         *   to the work queue because the application is shutting down. (not null)
+         */
+        public QueryEventWorkGenerator(
+                final String ryaInstance,
+                final CountDownLatch subscriptionWorkFinished,
+                final BlockingQueue<QueryEvent> queryWorkQueue,
+                final long blockingValue,
+                final TimeUnit blockingUnits,
+                final AtomicBoolean shutdownSignal) {
+            this.ryaInstance = requireNonNull(ryaInstance);
+            this.subscriptionWorkFinished = requireNonNull(subscriptionWorkFinished);
+            this.queryWorkQueue = requireNonNull(queryWorkQueue);
+            this.blockingValue = blockingValue;
+            this.blockingUnits = requireNonNull(blockingUnits);
+            this.shutdownSignal = requireNonNull(shutdownSignal);
+        }
+
+        @Override
+        public void notify(final ChangeLogEntry<QueryChange> queryChangeEvent, final Optional<StreamsQuery> newQueryState) {
+            requireNonNull(queryChangeEvent);
+            requireNonNull(newQueryState);
+
+            // Wait for the subscription work to be finished.
             try {
-                LOG.info("Notified of deleting QueryChangeLog, stopping all queries belonging to the change log for "
-                        + ryaInstanceName + ".");
-                queryExecutor.stopAll(ryaInstanceName);
-            } catch (final QueryExecutorException e) {
-                LOG.error("Failed to stop all queries belonging to: " + ryaInstanceName, e);
-            } finally {
-                lock.unlock();
+                log.debug("Waiting for Subscription Work Finished latch to release...");
+                while(!shutdownSignal.get() && !subscriptionWorkFinished.await(blockingValue, blockingUnits)) {
+                    log.debug("Still waiting...");
+                }
+                log.debug("Subscription Work Finished latch to released.");
+            } catch (final InterruptedException e) {
+                log.warn("Interrupted while waiting for the Subscription Work Finished latch to be " +
+                        "released. Shutting down?", e);
+            }
+
+            // If we left the loop because of a shutdown, return immediately.
+            if(shutdownSignal.get()) {
+                log.debug("Not processing notification. Shutting down.");
+                return;
+            }
+
+            // Generate work from the notification.
+            final QueryChange change = queryChangeEvent.getEntry();
+            switch(change.getChangeType()) {
+                case CREATE:
+                    if(newQueryState.isPresent()) {
+                        log.info("Rya Instance " + ryaInstance + " created Rya Streams query " + newQueryState + ".");
+                        final StreamsQuery newQuery = newQueryState.get();
+                        if(newQuery.isActive()) {
+                            final QueryEvent executeNewQuery = QueryEvent.executing(ryaInstance, newQuery);
+                            offerUntilAcceptedOrShutdown(queryWorkQueue, executeNewQuery, blockingValue, blockingUnits, shutdownSignal);
+                        }
+                    } else {
+                        log.error("Received a CREATE QueryChange for Rya Instance: " + ryaInstance +
+                                ", Query ID: " + change.getQueryId() + ", but the QueryRepository did not supply a " +
+                                "StreamsQuery representing the created query. The query will not be processed.");
+                    }
+                    break;
+
+                case DELETE:
+                    final UUID deletedQueryId = change.getQueryId();
+                    log.info("Rya Instance " + ryaInstance + " deleted Rya Streams query with ID " + deletedQueryId);
+                    final QueryEvent stopDeletedQuery = QueryEvent.stopped(ryaInstance, deletedQueryId);
+                    offerUntilAcceptedOrShutdown(queryWorkQueue, stopDeletedQuery, blockingValue, blockingUnits, shutdownSignal);
+                    break;
+
+                case UPDATE:
+                    if(newQueryState.isPresent()) {
+                        final StreamsQuery updatedQuery = newQueryState.get();
+                        if(updatedQuery.isActive()) {
+                            log.info("Rya Instance " + ryaInstance + " updated Rya Streams query with ID " +
+                                    updatedQuery.getQueryId() + " to be active.");
+                            final QueryEvent executeUpdatedQuery = QueryEvent.executing(ryaInstance, updatedQuery);
+                            offerUntilAcceptedOrShutdown(queryWorkQueue, executeUpdatedQuery, blockingValue, blockingUnits, shutdownSignal);
+                        } else {
+                            log.info("Rya Instance " + ryaInstance + " updated Rya Streams query with ID " +
+                                    updatedQuery.getQueryId() + " to be inactive.");
+                            final QueryEvent stopUpdatedQuery = QueryEvent.stopped(ryaInstance, updatedQuery.getQueryId());
+                            offerUntilAcceptedOrShutdown(queryWorkQueue, stopUpdatedQuery, blockingValue, blockingUnits, shutdownSignal);
+                        }
+                    } else {
+                        log.error("Received an UPDATE QueryChange for Rya Instance: " + ryaInstance +
+                                ", Query ID: " + change.getQueryId() + ", but the QueryRepository did not supply a " +
+                                "StreamsQuery representing the created query. The query will not be processed.");
+                    }
+                    break;
             }
         }
     }
-}
+
+    /**
+     * Processes a work queue of {@link QueryEvent}s.
+     * <p/>
+     * Each type of event maps the to corresponding method on {@link QueryExecutor} that is called into.
+     */
+    @DefaultAnnotation(NonNull.class)
+    static class QueryEventWorker implements Runnable {
+
+        private final BlockingQueue<QueryEvent> workQueue;
+        private final QueryExecutor queryExecutor;
+        private final long pollingValue;
+        private final TimeUnit pollingUnits;
+        private final AtomicBoolean shutdownSignal;
+
+        /**
+         * Constructs an instance of {@link QueryEventWorker}.
+         *
+         * @param workQueue - A queue of {@link QueryEvent}s that will be worked by this object. (not null)
+         * @param queryExecutor - Responsible for executing the {@link StreamsQuery}s. (not null)
+         * @param pollingValue - How long to wait when polling for new work.
+         * @param pollingUnits - The units for the {@code pollingValue}. (not null)
+         * @param shutdownSignal - Indicates when the application has been shutdown, so the executing thread
+         *   may exit the {@link #run()} method. (not null)
+         */
+        public QueryEventWorker(
+                final BlockingQueue<QueryEvent> workQueue,
+                final QueryExecutor queryExecutor,
+                final long pollingValue,
+                final TimeUnit pollingUnits,
+                final AtomicBoolean shutdownSignal) {
+            this.workQueue = requireNonNull(workQueue);
+            this.queryExecutor = requireNonNull(queryExecutor);
+            this.pollingValue = pollingValue;
+            this.pollingUnits = requireNonNull(pollingUnits);
+            this.shutdownSignal = requireNonNull(shutdownSignal);
+        }
+
+        @Override
+        public void run() {
+            log.info("QueryEventWorker starting.");
+
+            // Run until the shutdown signal is set.
+            while(!shutdownSignal.get()) {
+                // Pull a unit of work from the queue.
+                try {
+                    log.debug("Polling the work queue for a new QueryEvent.");
+                    final QueryEvent event = workQueue.poll(pollingValue, pollingUnits);
+                    if(event == null) {
+                        // Poll again if nothing was found.
+                        continue;
+                    }
+
+                    log.info("QueryEventWorker handling:\n" + event);
+
+                    // Ensure the state within the executor matches the query event's state.
+                    switch(event.getType()) {
+                        case EXECUTING:
+                            try {
+                                queryExecutor.startQuery(event.getRyaInstance(), event.getStreamsQuery().get());
+                            } catch (final IllegalStateException | QueryExecutorException e) {
+                                log.error("Could not start a query represented by the following work: " + event, e);
+                            }
+                            break;
+
+                        case STOPPED:
+                            try {
+                                queryExecutor.stopQuery(event.getQueryId().get());
+                            } catch (final IllegalStateException | QueryExecutorException e) {
+                                log.error("Could not stop a query represented by the following work: " + event, e);
+                            }
+                            break;
+
+                        case STOP_ALL:
+                            try {
+                                queryExecutor.stopAll(event.getRyaInstance());
+                            } catch (final IllegalStateException | QueryExecutorException e) {
+                                log.error("Could not stop all queries represented by the following work: " + event, e);
+                            }
+                        break;
+                    }
+                } catch (final InterruptedException e) {
+                    log.debug("QueryEventWorker interrupted. Probably shutting down.");
+                }
+            }
+            log.info("QueryEventWorker shut down.");
+        }
+    }
+}
\ No newline at end of file
diff --git a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManagerDaemon.java b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManagerDaemon.java
index 515d699..04a0382 100644
--- a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManagerDaemon.java
+++ b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManagerDaemon.java
@@ -33,6 +33,7 @@
 import org.apache.commons.daemon.DaemonInitException;
 import org.apache.rya.streams.kafka.KafkaStreamsFactory;
 import org.apache.rya.streams.kafka.SingleThreadKafkaStreamsFactory;
+import org.apache.rya.streams.kafka.interactor.CreateKafkaTopic;
 import org.apache.rya.streams.querymanager.kafka.KafkaQueryChangeLogSource;
 import org.apache.rya.streams.querymanager.kafka.LocalQueryExecutor;
 import org.apache.rya.streams.querymanager.xml.Kafka;
@@ -91,7 +92,7 @@
 
         // Unmarshall the configuration file into an object.
         final QueryManagerConfig config;
-        try(InputStream stream = Files.newInputStream(configFile)) {
+        try(final InputStream stream = Files.newInputStream(configFile)) {
             config = QueryManagerConfigUnmarshaller.unmarshall(stream);
         } catch(final JAXBException | SAXException e) {
             throw new DaemonInitException("Unable to marshall the configuration XML file: " + configFile, e);
@@ -110,11 +111,12 @@
         final QueryChangeLogSource source = new KafkaQueryChangeLogSource(kafka.getHostname(), kafka.getPort(), scheduler);
 
         // Initialize a QueryExecutor.
+        final String zookeeperServers = config.getQueryExecutor().getLocalKafkaStreams().getZookeepers();
         final KafkaStreamsFactory streamsFactory = new SingleThreadKafkaStreamsFactory(kafka.getHostname() + ":" + kafka.getPort());
-        final QueryExecutor queryExecutor = new LocalQueryExecutor(streamsFactory);
+        final QueryExecutor queryExecutor = new LocalQueryExecutor(new CreateKafkaTopic(zookeeperServers), streamsFactory);
 
         // Initialize the QueryManager using the configured resources.
-        manager = new QueryManager(queryExecutor, source, scheduler);
+        manager = new QueryManager(queryExecutor, source, period, units);
     }
 
     @Override
diff --git a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSource.java b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSource.java
index 32305f5..e746baf 100644
--- a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSource.java
+++ b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSource.java
@@ -33,11 +33,12 @@
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.rya.streams.api.queries.QueryChangeLog;
 import org.apache.rya.streams.kafka.KafkaTopics;
 import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog;
 import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLogFactory;
 import org.apache.rya.streams.querymanager.QueryChangeLogSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.AbstractScheduledService;
@@ -53,6 +54,8 @@
 @DefaultAnnotation(NonNull.class)
 public class KafkaQueryChangeLogSource extends AbstractScheduledService implements QueryChangeLogSource {
 
+    private static final Logger log = LoggerFactory.getLogger(KafkaQueryChangeLogSource.class);
+
     /**
      * Ensures thread safe interactions with this object.
      */
@@ -74,10 +77,10 @@
     private final Set<SourceListener> listeners = new HashSet<>();
 
     /**
-     * Maps Rya instance name to a Query Change Log for that instance. This map is used to keep
-     * track of how the change logs change over time within the Kafka Server.
+     * Maps Rya instance names to the Query Change Log topic name in Kafka. This map is used to
+     * keep track of how the change logs change over time within the Kafka Server.
      */
-    private final HashMap<String, QueryChangeLog> knownChangeLogs = new HashMap<>();
+    private final HashMap<String, String> knownChangeLogs = new HashMap<>();
 
     /**
      * A consumer that is used to poll the Kafka Server for topics.
@@ -101,6 +104,8 @@
 
     @Override
     protected void startUp() throws Exception {
+        log.info("Kafka Query Change Log Source watching " + kafkaBootstrapServer + " starting up...");
+
         // Setup the consumer that is used to list topics for the source.
         final Properties consumerProperties = new Properties();
         consumerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServer);
@@ -108,17 +113,23 @@
         consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
         consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
         listTopicsConsumer = new KafkaConsumer<>(consumerProperties);
+
+        log.info("Kafka Query Change Log Source watching " + kafkaBootstrapServer + " started.");
     }
 
     @Override
     protected void shutDown() throws Exception {
-        // Shut down the consumer that's used to list topics.
-        listTopicsConsumer.close();
+        log.info("Kafka Query Change Log Source watching " + kafkaBootstrapServer + " shutting down...");
 
-        // Shut down all of the change logs that were created within this class.
-        for(final QueryChangeLog changeLog : knownChangeLogs.values()) {
-            changeLog.close();
+        lock.lock();
+        try {
+            // Shut down the consumer that's used to list topics.
+            listTopicsConsumer.close();
+        } finally {
+            lock.unlock();
         }
+
+        log.info("Kafka Query Change Log Source watching " + kafkaBootstrapServer + " shut down.");
     }
 
     @Override
@@ -130,8 +141,10 @@
             listeners.add(listener);
 
             // Notify it with everything that already exists.
-            for(final Entry<String, QueryChangeLog> entry : knownChangeLogs.entrySet()) {
-                listener.notifyCreate(entry.getKey(), entry.getValue());
+            for(final Entry<String, String> entry : knownChangeLogs.entrySet()) {
+                final String changeLogTopic = entry.getValue();
+                final KafkaQueryChangeLog changeLog = KafkaQueryChangeLogFactory.make(kafkaBootstrapServer, changeLogTopic);
+                listener.notifyCreate(entry.getKey(), changeLog);
             }
         } finally {
             lock.unlock();
@@ -174,26 +187,23 @@
             // Handle the deletes.
             for(final String deletedRyaInstance : deletedRyaInstances) {
                 // Remove the change log from the set of known logs.
-                final QueryChangeLog removed = knownChangeLogs.remove(deletedRyaInstance);
+                knownChangeLogs.remove(deletedRyaInstance);
 
-                // Notify the listeners of the update.
+                // Notify the listeners of the update so that they may close the previously provided change log.
                 for(final SourceListener listener : listeners) {
                     listener.notifyDelete(deletedRyaInstance);
                 }
-
-                // Ensure the change log is closed.
-                removed.close();
             }
 
             // Handle the adds.
             for(final String createdRyaInstance : createdRyaInstances) {
                 // Create and store the ChangeLog.
                 final String changeLogTopic = KafkaTopics.queryChangeLogTopic(createdRyaInstance);
-                final KafkaQueryChangeLog changeLog = KafkaQueryChangeLogFactory.make(kafkaBootstrapServer, changeLogTopic);
-                knownChangeLogs.put(createdRyaInstance, changeLog);
+                knownChangeLogs.put(createdRyaInstance, changeLogTopic);
 
                 // Notify the listeners of the update.
                 for(final SourceListener listener : listeners) {
+                    final KafkaQueryChangeLog changeLog = KafkaQueryChangeLogFactory.make(kafkaBootstrapServer, changeLogTopic);
                     listener.notifyCreate(createdRyaInstance, changeLog);
                 }
             }
diff --git a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutor.java b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutor.java
index 947a215..3a59636 100644
--- a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutor.java
+++ b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutor.java
@@ -32,10 +32,15 @@
 import org.apache.rya.streams.api.entity.StreamsQuery;
 import org.apache.rya.streams.kafka.KafkaStreamsFactory;
 import org.apache.rya.streams.kafka.KafkaStreamsFactory.KafkaStreamsFactoryException;
+import org.apache.rya.streams.kafka.KafkaTopics;
+import org.apache.rya.streams.kafka.interactor.CreateKafkaTopic;
 import org.apache.rya.streams.querymanager.QueryExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.AbstractIdleService;
 
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
@@ -50,6 +55,7 @@
  */
 @DefaultAnnotation(NonNull.class)
 public class LocalQueryExecutor extends AbstractIdleService implements QueryExecutor {
+    private static final Logger log = LoggerFactory.getLogger(LocalQueryExecutor.class);
 
     /**
      * Provides thread safety when interacting with this class.
@@ -72,6 +78,11 @@
     private final Map<UUID, KafkaStreams> byQueryId = new HashMap<>();
 
     /**
+     * Used to create the input and output topics for a Kafka Streams job.
+     */
+    private final CreateKafkaTopic createKafkaTopic;
+
+    /**
      * Builds the {@link KafkaStreams} objects that execute {@link KafkaStream}s.
      */
     private final KafkaStreamsFactory streamsFactory;
@@ -79,23 +90,31 @@
     /**
      * Constructs an instance of {@link LocalQueryExecutor}.
      *
+     * @param createKafkaTopic - Used to create the input and output topics for a Kafka Streams job. (not null)
      * @param streamsFactory - Builds the {@link KafkaStreams} objects that execute {@link KafkaStream}s. (not null)
      */
-    public LocalQueryExecutor(final KafkaStreamsFactory streamsFactory) {
+    public LocalQueryExecutor(
+            final CreateKafkaTopic createKafkaTopic,
+            final KafkaStreamsFactory streamsFactory) {
+        this.createKafkaTopic = requireNonNull(createKafkaTopic);
         this.streamsFactory = requireNonNull(streamsFactory);
     }
 
     @Override
     protected void startUp() throws Exception {
-        // Nothing to do.
+        log.info("Local Query Executor starting up.");
     }
 
     @Override
     protected void shutDown() throws Exception {
+        log.info("Local Query Executor shutting down. Stopping all jobs...");
+
         // Stop all of the running queries.
         for(final KafkaStreams job : byQueryId.values()) {
             job.close();
         }
+
+        log.info("Local Query Executor shut down.");
     }
 
     @Override
@@ -106,6 +125,14 @@
 
         lock.lock();
         try {
+            // Make sure the Statements topic exists for the query.
+            final Set<String> topics = Sets.newHashSet(
+                    KafkaTopics.statementsTopic(ryaInstance),
+                    KafkaTopics.queryResultsTopic(query.getQueryId()));
+
+            // Make sure the Query Results topic exists for the query.
+            createKafkaTopic.createTopics(topics, 1, 1);
+
             // Setup the Kafka Streams job that will execute.
             final KafkaStreams streams = streamsFactory.make(ryaInstance, query);
             streams.start();
diff --git a/extras/rya.streams/query-manager/src/main/xsd/QueryManagerConfig.xsd b/extras/rya.streams/query-manager/src/main/xsd/QueryManagerConfig.xsd
index c1285d4..21170bb 100644
--- a/extras/rya.streams/query-manager/src/main/xsd/QueryManagerConfig.xsd
+++ b/extras/rya.streams/query-manager/src/main/xsd/QueryManagerConfig.xsd
@@ -30,6 +30,13 @@
             </xs:choice>
           </xs:complexType>
         </xs:element>
+        <xs:element name="queryExecutor">
+            <xs:complexType>
+              <xs:choice>
+                <xs:element name="localKafkaStreams" type="localKafkaStreams"/>
+              </xs:choice>
+            </xs:complexType>
+        </xs:element>
         <xs:element name="performanceTunning">
           <xs:complexType>
             <xs:sequence>
@@ -65,6 +72,13 @@
     </xs:sequence>
   </xs:complexType>
   
+  <!-- Define what a local Kafka Streams query executor looks like. -->
+  <xs:complexType name="localKafkaStreams">
+    <xs:sequence>
+      <xs:element name="zookeepers" type="xs:string"/>
+    </xs:sequence>
+  </xs:complexType>
+  
   <!-- Define the legal range for a TCP port. -->
   <xs:simpleType name="tcpPort">
     <xs:restriction base="xs:int">
diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/LogEventWorkGeneratorTest.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/LogEventWorkGeneratorTest.java
new file mode 100644
index 0000000..da9be78
--- /dev/null
+++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/LogEventWorkGeneratorTest.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.querymanager;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.rya.streams.api.queries.QueryChangeLog;
+import org.apache.rya.streams.querymanager.QueryManager.LogEvent;
+import org.apache.rya.streams.querymanager.QueryManager.LogEvent.LogEventType;
+import org.apache.rya.streams.querymanager.QueryManager.LogEventWorkGenerator;
+import org.junit.Test;
+
+/**
+ * Unit tests the methods of {@link LogEventWorkGenerator}.
+ */
+public class LogEventWorkGeneratorTest {
+
+    @Test
+    public void shutdownSignalKillsThread() {
+        // The signal that will kill the notifying thread.
+        final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+        // The queue generated work is offered to.
+        final BlockingQueue<LogEvent> queue = new ArrayBlockingQueue<>(1);
+
+        // The listener that will perform the LogEventWorkGenerator work.
+        final LogEventWorkGenerator generator = new LogEventWorkGenerator(queue, 50, TimeUnit.MILLISECONDS, shutdownSignal);
+
+        // A thread that will attempt to notify the generator with a created change log.
+        final Thread notifyThread = new Thread(() -> {
+            generator.notifyCreate("rya", mock(QueryChangeLog.class));
+        });
+
+        // Fill the queue so that nothing may be offered to it.
+        queue.offer(LogEvent.delete("rya"));
+
+        // Start the thread and show that it is still alive after the offer period.
+        notifyThread.start();
+        assertTrue( ThreadUtil.stillAlive(notifyThread, 200) );
+
+        // Set the shutdown signal to true and join the thread. If we were able to join, then it shut down.
+        shutdownSignal.set(true);
+        assertFalse( ThreadUtil.stillAlive(notifyThread, 1000) );
+    }
+
+    @Test
+    public void notifyCreate() throws Exception {
+        // The signal that will kill the notifying thread.
+        final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+        // The queue generated work is offered to.
+        final BlockingQueue<LogEvent> queue = new ArrayBlockingQueue<>(1);
+
+        // The listener that will perform the LogEventWorkGenerator work.
+        final LogEventWorkGenerator generator = new LogEventWorkGenerator(queue, 50, TimeUnit.MILLISECONDS, shutdownSignal);
+
+        // A thread that will attempt to notify the generator with a created change log.
+        final CountDownLatch notified = new CountDownLatch(1);
+        final Thread notifyThread = new Thread(() -> {
+            generator.notifyCreate("rya", mock(QueryChangeLog.class));
+            notified.countDown();
+        });
+
+        try {
+            // Start the thread that performs the notification.
+            notifyThread.start();
+
+            // Wait for the thread to indicate it has notified and check the queue for the value.
+            notified.await(200, TimeUnit.MILLISECONDS);
+            final LogEvent event = queue.poll(200, TimeUnit.MILLISECONDS);
+            assertEquals(LogEventType.CREATE, event.getEventType());
+            assertEquals("rya", event.getRyaInstanceName());
+        } finally {
+            shutdownSignal.set(true);
+            notifyThread.join();
+        }
+    }
+
+    @Test
+    public void notifyDelete() throws Exception {
+        // The signal that will kill the notifying thread.
+        final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+        // The queue generated work is offered to.
+        final BlockingQueue<LogEvent> queue = new ArrayBlockingQueue<>(1);
+
+        // The listener that will perform the LogEventWorkGenerator work.
+        final LogEventWorkGenerator generator = new LogEventWorkGenerator(queue, 50, TimeUnit.MILLISECONDS, shutdownSignal);
+
+        // A thread that will attempt to notify the generator with a deleted change log.
+        final CountDownLatch notified = new CountDownLatch(1);
+        final Thread notifyThread = new Thread(() -> {
+            generator.notifyDelete("rya");
+            notified.countDown();
+        });
+
+        try {
+            // Start the thread that performs the notification.
+            notifyThread.start();
+
+            // Wait for the thread to indicate it has notified and check the queue for the value.
+            notified.await(200, TimeUnit.MILLISECONDS);
+            final LogEvent event = queue.poll(200, TimeUnit.MILLISECONDS);
+            assertEquals(LogEventType.DELETE, event.getEventType());
+            assertEquals("rya", event.getRyaInstanceName());
+        } finally {
+            shutdownSignal.set(true);
+            notifyThread.join();
+        }
+    }
+}
\ No newline at end of file
diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/LogEventWorkerTest.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/LogEventWorkerTest.java
new file mode 100644
index 0000000..cb708ed
--- /dev/null
+++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/LogEventWorkerTest.java
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.querymanager;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.api.queries.InMemoryQueryChangeLog;
+import org.apache.rya.streams.api.queries.QueryChange;
+import org.apache.rya.streams.api.queries.QueryChangeLog;
+import org.apache.rya.streams.querymanager.QueryManager.LogEvent;
+import org.apache.rya.streams.querymanager.QueryManager.LogEventWorker;
+import org.apache.rya.streams.querymanager.QueryManager.QueryEvent;
+import org.junit.Test;
+
+/**
+ * Unit tests the methods of {@link LogEventWorker}.
+ */
+public class LogEventWorkerTest {
+
+    @Test
+    public void shutdownSignalKillsThread() {
+        // The signal that will kill the working thread.
+        final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+        // The thread that will perform the LogEventWorker task.
+        final Thread logEventWorker = new Thread(new LogEventWorker(new ArrayBlockingQueue<>(1),
+                new ArrayBlockingQueue<>(1), 50, TimeUnit.MILLISECONDS, shutdownSignal));
+        logEventWorker.start();
+
+        // Wait longer than the poll time to see if the thread died. Show that it is still running.
+        assertTrue(ThreadUtil.stillAlive(logEventWorker, 200));
+
+        // Set the shutdown signal to true and join the thread. If we were able to join, then it shut down.
+        shutdownSignal.set(true);
+        assertFalse(ThreadUtil.stillAlive(logEventWorker, 500));
+    }
+
+    @Test
+    public void nofity_logCreated_doesNotExist() throws Exception {
+        // The signal that will kill the working thread.
+        final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+        // The queue used to feed work.
+        final BlockingQueue<LogEvent> logEventQueue = new ArrayBlockingQueue<>(10);
+
+        // The queue work is written to.
+        final BlockingQueue<QueryEvent> queryEventQueue = new ArrayBlockingQueue<>(10);
+
+        // The Query Change Log that will be watched.
+        final QueryChangeLog changeLog = new InMemoryQueryChangeLog();
+
+        // Write a message that indicates a new query should be active.
+        final UUID firstQueryId = UUID.randomUUID();
+        changeLog.write(QueryChange.create(firstQueryId, "select * where { ?a ?b ?c . }", true));
+
+        // Write a message that adds an active query, but then makes it inactive. Because both of these
+        // events are written to the log before the worker subscribes to the repository for updates, they
+        // must result in a single query stopped event.
+        final UUID secondQueryId = UUID.randomUUID();
+        changeLog.write(QueryChange.create(secondQueryId, "select * where { ?d ?e ?f . }", true));
+        changeLog.write(QueryChange.update(secondQueryId, false));
+
+        // Start the worker that will be tested.
+        final Thread logEventWorker = new Thread(new LogEventWorker(logEventQueue,
+                queryEventQueue, 50, TimeUnit.MILLISECONDS, shutdownSignal));
+        logEventWorker.start();
+
+        try {
+            // Write a unit of work that indicates a log was created.
+            final LogEvent createLogEvent = LogEvent.create("rya", changeLog);
+            logEventQueue.offer(createLogEvent);
+
+            // We must see the following Query Events added to the work queue.
+            // Query 1, executing.
+            // Query 2, stopped.
+            Set<QueryEvent> expectedEvents = new HashSet<>();
+            expectedEvents.add(QueryEvent.executing("rya",
+                    new StreamsQuery(firstQueryId, "select * where { ?a ?b ?c . }", true)));
+            expectedEvents.add(QueryEvent.stopped("rya", secondQueryId));
+
+            Set<QueryEvent> queryEvents = new HashSet<>();
+            queryEvents.add( queryEventQueue.poll(500, TimeUnit.MILLISECONDS) );
+            queryEvents.add( queryEventQueue.poll(500, TimeUnit.MILLISECONDS) );
+
+            assertEquals(expectedEvents, queryEvents);
+
+            // Write an event to the change log that stops the first query.
+            changeLog.write(QueryChange.update(firstQueryId, false));
+
+            // Show it was also reflected in the changes.
+            // Query 1, stopped.
+            expectedEvents = new HashSet<>();
+            expectedEvents.add(QueryEvent.stopped("rya", firstQueryId));
+
+            queryEvents = new HashSet<>();
+            queryEvents.add( queryEventQueue.poll(500, TimeUnit.MILLISECONDS) );
+
+            assertEquals(expectedEvents, queryEvents);
+        } finally {
+            shutdownSignal.set(true);
+            logEventWorker.join();
+        }
+    }
+
+    @Test
+    public void nofity_logCreated_exists() throws Exception {
+        // The signal that will kill the working thread.
+        final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+        // The queue used to feed work.
+        final BlockingQueue<LogEvent> logEventQueue = new ArrayBlockingQueue<>(10);
+
+        // The queue work is written to.
+        final BlockingQueue<QueryEvent> queryEventQueue = new ArrayBlockingQueue<>(10);
+
+        // The Query Change Log that will be watched.
+        final QueryChangeLog changeLog = new InMemoryQueryChangeLog();
+
+        // Write a message that indicates a new query should be active.
+        final UUID firstQueryId = UUID.randomUUID();
+        changeLog.write(QueryChange.create(firstQueryId, "select * where { ?a ?b ?c . }", true));
+
+        // Start the worker that will be tested.
+        final Thread logEventWorker = new Thread(new LogEventWorker(logEventQueue,
+                queryEventQueue, 50, TimeUnit.MILLISECONDS, shutdownSignal));
+        logEventWorker.start();
+
+        try {
+            // Write a unit of work that indicates a log was created.
+            final LogEvent createLogEvent = LogEvent.create("rya", changeLog);
+            logEventQueue.offer(createLogEvent);
+
+            // Say the same log was created a second time.
+            logEventQueue.offer(createLogEvent);
+
+            // Show that only a single unit of work was added for the log. This indicates the
+            // second message was effectively skipped as it would have add its work added twice otherwise.
+            final Set<QueryEvent> expectedEvents = new HashSet<>();
+            expectedEvents.add(QueryEvent.executing("rya",
+                    new StreamsQuery(firstQueryId, "select * where { ?a ?b ?c . }", true)));
+
+            final Set<QueryEvent> queryEvents = new HashSet<>();
+            queryEvents.add( queryEventQueue.poll(500, TimeUnit.MILLISECONDS) );
+
+            assertNull(queryEventQueue.poll(500, TimeUnit.MILLISECONDS));
+            assertEquals(expectedEvents, queryEvents);
+        } finally {
+            shutdownSignal.set(true);
+            logEventWorker.join();
+        }
+    }
+
+    @Test
+    public void notify_logDeleted_exists() throws Exception {
+        // The signal that will kill the working thread.
+        final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+        // The queue used to feed work.
+        final BlockingQueue<LogEvent> logEventQueue = new ArrayBlockingQueue<>(10);
+
+        // The queue work is written to.
+        final BlockingQueue<QueryEvent> queryEventQueue = new ArrayBlockingQueue<>(10);
+
+        // The Query Change Log that will be watched.
+        final QueryChangeLog changeLog = new InMemoryQueryChangeLog();
+
+        // Start the worker that will be tested.
+        final Thread logEventWorker = new Thread(new LogEventWorker(logEventQueue,
+                queryEventQueue, 50, TimeUnit.MILLISECONDS, shutdownSignal));
+        logEventWorker.start();
+
+        try {
+            // Write a unit of work that indicates a log was created.
+            final LogEvent createLogEvent = LogEvent.create("rya", changeLog);
+            logEventQueue.offer(createLogEvent);
+
+            // Write a unit of work that indicates a log was deleted.
+            logEventQueue.offer(LogEvent.delete("rya"));
+
+            // Show that a single unit of work was created for deleting everything for "rya".
+            assertEquals(QueryEvent.stopALL("rya"),  queryEventQueue.poll(500, TimeUnit.MILLISECONDS));
+            assertNull(queryEventQueue.poll(500, TimeUnit.MILLISECONDS));
+        } finally {
+            shutdownSignal.set(true);
+            logEventWorker.join();
+        }
+    }
+
+    @Test
+    public void notify_logDeleted_doesNotExist() throws Exception {
+        // The signal that will kill the working thread.
+        final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+        // The queue used to feed work.
+        final BlockingQueue<LogEvent> logEventQueue = new ArrayBlockingQueue<>(10);
+
+        // The queue work is written to.
+        final BlockingQueue<QueryEvent> queryEventQueue = new ArrayBlockingQueue<>(10);
+
+        // Start the worker that will be tested.
+        final Thread logEventWorker = new Thread(new LogEventWorker(logEventQueue,
+                queryEventQueue, 50, TimeUnit.MILLISECONDS, shutdownSignal));
+        logEventWorker.start();
+
+        try {
+            // Write a unit of work that indicates a log was deleted. Since it was never created,
+            // this will not cause anything to be written to the QueryEvent queue.
+            logEventQueue.offer(LogEvent.delete("rya"));
+
+            // Show that a single unit of work was created for deleting everything for "rya".
+            assertNull(queryEventQueue.poll(500, TimeUnit.MILLISECONDS));
+        } finally {
+            shutdownSignal.set(true);
+            logEventWorker.join();
+        }
+    }
+}
\ No newline at end of file
diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryEventWorkGeneratorTest.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryEventWorkGeneratorTest.java
new file mode 100644
index 0000000..4495e19
--- /dev/null
+++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryEventWorkGeneratorTest.java
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.querymanager;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.api.queries.ChangeLogEntry;
+import org.apache.rya.streams.api.queries.QueryChange;
+import org.apache.rya.streams.querymanager.QueryManager.QueryEvent;
+import org.apache.rya.streams.querymanager.QueryManager.QueryEventWorkGenerator;
+import org.junit.Test;
+
+/**
+ * Unit tests the methods of {@link QueryEventWorkGenerator}.
+ */
+public class QueryEventWorkGeneratorTest {
+
+    @Test
+    public void shutdownSignalKillsThread() throws Exception {
+        // The signal that will kill the notifying thread.
+        final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+        // The queue generated work is offered to.
+        final BlockingQueue<QueryEvent> queue = new ArrayBlockingQueue<>(1);
+
+        // The listener that will perform the QueryEventWorkGenerator work.
+        final QueryEventWorkGenerator generator =
+                new QueryEventWorkGenerator("rya", new CountDownLatch(1), queue, 50, TimeUnit.MILLISECONDS, shutdownSignal);
+
+        // A thread that will attempt to notify the generator with a created query.
+        final Thread notifyThread = new Thread(() -> {
+            generator.notify(mock(ChangeLogEntry.class), Optional.empty());
+        });
+
+        // Fill the queue so that nothing may be offered to it.
+        queue.offer(QueryEvent.stopALL("rya"));
+
+        // Start the thread and show that it is still alive after the offer period.
+        notifyThread.start();
+        assertTrue( ThreadUtil.stillAlive(notifyThread, 200) );
+
+        // Set the shutdown signal to true and join the thread. If we were able to join, then it shut down.
+        shutdownSignal.set(true);
+        assertFalse( ThreadUtil.stillAlive(notifyThread, 1000) );
+    }
+
+    @Test
+    public void waitsForSubscriptionWork() throws Exception {
+        // The signal that will kill the notifying thread.
+        final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+        // The queue generated work is offered to.
+        final BlockingQueue<QueryEvent> queue = new ArrayBlockingQueue<>(1);
+
+        // The listener that will perform the QueryEventWorkGenerator work.
+        final CountDownLatch latch = new CountDownLatch(1);
+        final QueryEventWorkGenerator generator =
+                new QueryEventWorkGenerator("rya", latch, queue, 50, TimeUnit.MILLISECONDS, shutdownSignal);
+
+        // A thread that will attempt to notify the generator with a created query.
+        final UUID queryId = UUID.randomUUID();
+        final StreamsQuery query = new StreamsQuery(queryId, "query", true);
+        final Thread notifyThread = new Thread(() -> {
+            final QueryChange change = QueryChange.create(queryId, query.getSparql(), query.isActive());
+            final ChangeLogEntry<QueryChange> entry = new ChangeLogEntry<>(0, change);
+            generator.notify(entry, Optional.of(query));
+        });
+
+        // Start the thread.
+        notifyThread.start();
+
+        try {
+            // Wait longer than the blocking period and show the thread is still alive and nothing has been added
+            // to the work queue.
+            Thread.sleep(150);
+            assertTrue( notifyThread.isAlive() );
+
+            // Count down the latch.
+            latch.countDown();
+
+            // Show work was added to the queue and the notifying thread died.
+            final QueryEvent event = queue.poll(500, TimeUnit.MILLISECONDS);
+            final QueryEvent expected = QueryEvent.executing("rya", new StreamsQuery(queryId, query.getSparql(), query.isActive()));
+            assertEquals(expected, event);
+        } finally {
+            shutdownSignal.set(true);
+            notifyThread.join();
+        }
+    }
+
+    @Test
+    public void notifyCreate() throws Exception {
+        // The signal that will kill the notifying thread.
+        final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+        // The queue generated work is offered to.
+        final BlockingQueue<QueryEvent> queue = new ArrayBlockingQueue<>(1);
+
+        // The listener that will perform the QueryEventWorkGenerator work.
+        final CountDownLatch latch = new CountDownLatch(1);
+        latch.countDown();
+        final QueryEventWorkGenerator generator =
+                new QueryEventWorkGenerator("rya", latch, queue, 50, TimeUnit.MILLISECONDS, shutdownSignal);
+
+        // A thread that will attempt to notify the generator with a created query.
+        final UUID queryId = UUID.randomUUID();
+        final StreamsQuery query = new StreamsQuery(queryId, "query", true);
+        final Thread notifyThread = new Thread(() -> {
+            final QueryChange change = QueryChange.create(queryId, query.getSparql(), query.isActive());
+            final ChangeLogEntry<QueryChange> entry = new ChangeLogEntry<>(0, change);
+            generator.notify(entry, Optional.of(query));
+        });
+
+        // Start the thread.
+        notifyThread.start();
+
+        try {
+            // Show work was added to the queue and the notifying thread died.
+            final QueryEvent event = queue.poll(500, TimeUnit.MILLISECONDS);
+            final QueryEvent expected = QueryEvent.executing("rya", new StreamsQuery(queryId, query.getSparql(), query.isActive()));
+            assertEquals(expected, event);
+        } finally {
+            shutdownSignal.set(true);
+            notifyThread.join();
+        }
+    }
+
+    @Test
+    public void notifyDelete() throws Exception {
+        // The signal that will kill the notifying thread.
+        final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+        // The queue generated work is offered to.
+        final BlockingQueue<QueryEvent> queue = new ArrayBlockingQueue<>(1);
+
+        // The listener that will perform the QueryEventWorkGenerator work.
+        final CountDownLatch latch = new CountDownLatch(1);
+        latch.countDown();
+        final QueryEventWorkGenerator generator =
+                new QueryEventWorkGenerator("rya", latch, queue, 50, TimeUnit.MILLISECONDS, shutdownSignal);
+
+        // A thread that will attempt to notify the generator with a deleted query.
+        final UUID queryId = UUID.randomUUID();
+        final Thread notifyThread = new Thread(() -> {
+            final QueryChange change = QueryChange.delete(queryId);
+            final ChangeLogEntry<QueryChange> entry = new ChangeLogEntry<>(0, change);
+            generator.notify(entry, Optional.empty());
+        });
+
+        // Start the thread.
+        notifyThread.start();
+
+        try {
+            // Show work was added to the queue and the notifying thread died.
+            final QueryEvent event = queue.poll(500, TimeUnit.MILLISECONDS);
+            final QueryEvent expected = QueryEvent.stopped("rya", queryId);
+            assertEquals(expected, event);
+        } finally {
+            shutdownSignal.set(true);
+            notifyThread.join();
+        }
+    }
+
+    @Test
+    public void notifyUpdate_isActive() throws Exception {
+        // The signal that will kill the notifying thread.
+        final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+        // The queue generated work is offered to.
+        final BlockingQueue<QueryEvent> queue = new ArrayBlockingQueue<>(1);
+
+        // The listener that will perform the QueryEventWorkGenerator work.
+        final CountDownLatch latch = new CountDownLatch(1);
+        latch.countDown();
+        final QueryEventWorkGenerator generator =
+                new QueryEventWorkGenerator("rya", latch, queue, 50, TimeUnit.MILLISECONDS, shutdownSignal);
+
+        // A thread that will attempt to notify the generator with an update query change.
+        final UUID queryId = UUID.randomUUID();
+        final StreamsQuery query = new StreamsQuery(queryId, "query", true);
+        final Thread notifyThread = new Thread(() -> {
+            final QueryChange change = QueryChange.update(queryId, true);
+            final ChangeLogEntry<QueryChange> entry = new ChangeLogEntry<>(0, change);
+            generator.notify(entry, Optional.of(query));
+        });
+
+        // Start the thread.
+        notifyThread.start();
+
+        try {
+            // Show work was added to the queue and the notifying thread died.
+            final QueryEvent event = queue.poll(500, TimeUnit.MILLISECONDS);
+            final QueryEvent expected = QueryEvent.executing("rya", new StreamsQuery(queryId, query.getSparql(), query.isActive()));
+            assertEquals(expected, event);
+        } finally {
+            shutdownSignal.set(true);
+            notifyThread.join();
+        }
+    }
+
+    @Test
+    public void notifyUpdate_isNotActive() throws Exception {
+        // The signal that will kill the notifying thread.
+        final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+        // The queue generated work is offered to.
+        final BlockingQueue<QueryEvent> queue = new ArrayBlockingQueue<>(1);
+
+        // The listener that will perform the QueryEventWorkGenerator work.
+        final CountDownLatch latch = new CountDownLatch(1);
+        latch.countDown();
+        final QueryEventWorkGenerator generator =
+                new QueryEventWorkGenerator("rya", latch, queue, 50, TimeUnit.MILLISECONDS, shutdownSignal);
+
+        // A thread that will attempt to notify the generator with an update query change.
+        final UUID queryId = UUID.randomUUID();
+        final StreamsQuery query = new StreamsQuery(queryId, "query", false);
+        final Thread notifyThread = new Thread(() -> {
+            final QueryChange change = QueryChange.update(queryId, false);
+            final ChangeLogEntry<QueryChange> entry = new ChangeLogEntry<>(0, change);
+            generator.notify(entry, Optional.of(query));
+        });
+
+        // Start the thread.
+        notifyThread.start();
+
+        try {
+            // Show work was added to the queue and the notifying thread died.
+            final QueryEvent event = queue.poll(500, TimeUnit.MILLISECONDS);
+            final QueryEvent expected = QueryEvent.stopped("rya", queryId);
+            assertEquals(expected, event);
+        } finally {
+            shutdownSignal.set(true);
+            notifyThread.join();
+        }
+    }
+}
\ No newline at end of file
diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryEventWorkerTest.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryEventWorkerTest.java
new file mode 100644
index 0000000..33c0719
--- /dev/null
+++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryEventWorkerTest.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.querymanager;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.querymanager.QueryManager.QueryEvent;
+import org.apache.rya.streams.querymanager.QueryManager.QueryEventWorker;
+import org.junit.Test;
+
+/**
+ * Unit tests the methods of {@link QueryManager.QueryEventWorker}.
+ */
+public class QueryEventWorkerTest {
+
+    @Test
+    public void shutdownSignalKillsThread() {
+        // The signal that will kill the working thread.
+        final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+        // The thread that will perform the QueryEventWorker task.
+        final Thread queryEventWorker = new Thread(new QueryEventWorker(new ArrayBlockingQueue<>(1),
+                mock(QueryExecutor.class), 50, TimeUnit.MILLISECONDS, shutdownSignal));
+        queryEventWorker.start();
+
+        // Wait longer than the poll time to see if the thread died. Show that it is still running.
+        assertTrue(ThreadUtil.stillAlive(queryEventWorker, 200));
+
+        // Set the shutdown signal to true and join the thread. If we were able to join, then it shut down.
+        shutdownSignal.set(true);
+        assertFalse(ThreadUtil.stillAlive(queryEventWorker, 1000));
+    }
+
+    @Test
+    public void executingWork() throws Exception {
+        // The signal that will kill the working thread.
+        final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+        // The queue used to send the execute work to the thread.
+        final BlockingQueue<QueryEvent> queue = new ArrayBlockingQueue<>(1);
+
+        // The message that indicates a query needs to be executed.
+        final String ryaInstance = "rya";
+        final StreamsQuery query = new StreamsQuery(UUID.randomUUID(), "sparql", true);
+        final QueryEvent executingEvent = QueryEvent.executing(ryaInstance, query);
+
+        // Release a latch if the startQuery method on the queryExecutor is invoked with the correct values.
+        final CountDownLatch startQueryInvoked = new CountDownLatch(1);
+        final QueryExecutor queryExecutor = mock(QueryExecutor.class);
+        doAnswer(invocation -> {
+            startQueryInvoked.countDown();
+            return null;
+        }).when(queryExecutor).startQuery(ryaInstance, query);
+
+        // The thread that will perform the QueryEventWorker task.
+        final Thread queryEventWorker = new Thread(new QueryEventWorker(queue,
+                queryExecutor, 50, TimeUnit.MILLISECONDS, shutdownSignal));
+        try {
+            queryEventWorker.start();
+
+            // Provide a message indicating a query needs to be executing.
+            queue.put(executingEvent);
+
+            // Verify the Query Executor was told to start the query.
+            assertTrue( startQueryInvoked.await(150, TimeUnit.MILLISECONDS) );
+        } finally {
+            shutdownSignal.set(true);
+            queryEventWorker.join();
+        }
+    }
+
+    @Test
+    public void stoppedWork() throws Exception {
+        // The signal that will kill the working thread.
+        final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+        // The queue used to send the execute work to the thread.
+        final BlockingQueue<QueryEvent> queue = new ArrayBlockingQueue<>(1);
+
+        // The message that indicates a query needs to be stopped.
+        final UUID queryId = UUID.randomUUID();
+        final QueryEvent stoppedEvent = QueryEvent.stopped("rya", queryId);
+
+        // Release a latch if the stopQuery method on the queryExecutor is invoked with the correct values.
+        final CountDownLatch stopQueryInvoked = new CountDownLatch(1);
+        final QueryExecutor queryExecutor = mock(QueryExecutor.class);
+        doAnswer(invocation -> {
+            stopQueryInvoked.countDown();
+            return null;
+        }).when(queryExecutor).stopQuery(queryId);
+
+        final Thread queryEventWorker = new Thread(new QueryEventWorker(queue,
+                queryExecutor, 50, TimeUnit.MILLISECONDS, shutdownSignal));
+        try {
+            // The thread that will perform the QueryEventWorker task.
+            queryEventWorker.start();
+
+            // Provide a message indicating a query needs to be executing.
+            queue.put(stoppedEvent);
+
+            // Verify the Query Executor was told to stop the query.
+            assertTrue( stopQueryInvoked.await(150, TimeUnit.MILLISECONDS) );
+        } finally {
+            shutdownSignal.set(true);
+            queryEventWorker.join();
+        }
+    }
+
+    @Test
+    public void stopAllWork() throws Exception {
+        // The signal that will kill the working thread.
+        final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+        // The queue used to send the execute work to the thread.
+        final BlockingQueue<QueryEvent> queue = new ArrayBlockingQueue<>(1);
+
+        // The message that indicates all queries for a rya instance need to be stopped.
+        final String ryaInstance = "rya";
+        final QueryEvent stopAllEvent = QueryEvent.stopALL(ryaInstance);
+
+        // Release a latch if the stopQuery method on the queryExecutor is invoked with the correct values.
+        final CountDownLatch testMethodInvoked = new CountDownLatch(1);
+        final QueryExecutor queryExecutor = mock(QueryExecutor.class);
+        doAnswer(invocation -> {
+            testMethodInvoked.countDown();
+            return null;
+        }).when(queryExecutor).stopAll(ryaInstance);
+
+        final Thread queryEventWorker = new Thread(new QueryEventWorker(queue,
+                queryExecutor, 50, TimeUnit.MILLISECONDS, shutdownSignal));
+        try {
+            // The thread that will perform the QueryEventWorker task.
+            queryEventWorker.start();
+
+            // Provide a message indicating a query needs to be executing.
+            queue.put(stopAllEvent);
+
+            // Verify the Query Executor was told to stop all the queries.
+            assertTrue( testMethodInvoked.await(150, TimeUnit.MILLISECONDS) );
+        } finally {
+            shutdownSignal.set(true);
+            queryEventWorker.join();
+        }
+    }
+}
\ No newline at end of file
diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryManagerTest.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryManagerTest.java
index a1203a0..04e70c0 100644
--- a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryManagerTest.java
+++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryManagerTest.java
@@ -1,18 +1,20 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 package org.apache.rya.streams.querymanager;
 
@@ -34,13 +36,10 @@
 import org.apache.rya.streams.querymanager.QueryChangeLogSource.SourceListener;
 import org.junit.Test;
 
-import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
-
 /**
- * Test for the {@link QueryManager}
+ * Unit tests the methods of {@link QueryManager}.
  */
 public class QueryManagerTest {
-    private static final Scheduler TEST_SCHEDULER = Scheduler.newFixedRateSchedule(0, 100, TimeUnit.MILLISECONDS);
 
     /**
      * Tests when the query manager is notified to create a new query, the query
@@ -74,7 +73,7 @@
             return null;
         }).when(source).subscribe(any(SourceListener.class));
 
-        final QueryManager qm = new QueryManager(qe, source, TEST_SCHEDULER);
+        final QueryManager qm = new QueryManager(qe, source, 50, TimeUnit.MILLISECONDS);
         try {
             qm.startAndWait();
             queryStarted.await(5, TimeUnit.SECONDS);
@@ -128,7 +127,7 @@
             return null;
         }).when(source).subscribe(any(SourceListener.class));
 
-        final QueryManager qm = new QueryManager(qe, source, TEST_SCHEDULER);
+        final QueryManager qm = new QueryManager(qe, source, 50, TimeUnit.MILLISECONDS);
         try {
             qm.startAndWait();
             queryDeleted.await(5, TimeUnit.SECONDS);
@@ -183,7 +182,7 @@
             return null;
         }).when(source).subscribe(any(SourceListener.class));
 
-        final QueryManager qm = new QueryManager(qe, source, TEST_SCHEDULER);
+        final QueryManager qm = new QueryManager(qe, source, 50, TimeUnit.MILLISECONDS);
         try {
             qm.startAndWait();
             queryDeleted.await(10, TimeUnit.SECONDS);
@@ -192,4 +191,4 @@
             qm.stopAndWait();
         }
     }
-}
+}
\ No newline at end of file
diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/ThreadUtil.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/ThreadUtil.java
new file mode 100644
index 0000000..9896e31
--- /dev/null
+++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/ThreadUtil.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.querymanager;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Utilities that are useful for interacting with {@link Thread}s while testing.
+ */
+public class ThreadUtil {
+
+    /**
+     * Private constructor to prevent instantiation.
+     */
+    private ThreadUtil() { }
+
+    /**
+     * A utility function that returns whether a thread is alive or not after waiting
+     * some specified period of time to join it.
+     *
+     * @param thread - The thread that will be joined. (not null)
+     * @param millis - How long to wait to join the thread.
+     * @return {@code true} if the thread is still alive, otherwise {@code false}.
+     */
+    public static boolean stillAlive(final Thread thread, final long millis) {
+        requireNonNull(thread);
+        try {
+            thread.join(millis);
+        } catch (final InterruptedException e) { }
+        return thread.isAlive();
+    }
+}
\ No newline at end of file
diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java
index 3cbe894..f9c8a03 100644
--- a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java
+++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java
@@ -35,6 +35,7 @@
 import org.apache.rya.streams.kafka.KafkaStreamsFactory;
 import org.apache.rya.streams.kafka.KafkaTopics;
 import org.apache.rya.streams.kafka.SingleThreadKafkaStreamsFactory;
+import org.apache.rya.streams.kafka.interactor.CreateKafkaTopic;
 import org.apache.rya.streams.kafka.interactor.KafkaLoadStatements;
 import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
 import org.apache.rya.streams.kafka.serialization.VisibilityStatementSerializer;
@@ -119,9 +120,10 @@
         expected.add(new VisibilityBindingSet(bs, "a"));
 
         // Start the executor that will be tested.
+        final CreateKafkaTopic createKafkaTopic = new CreateKafkaTopic( kafka.getZookeeperServers() );
         final String kafkaServers = kafka.getKafkaHostname() + ":" + kafka.getKafkaPort();
         final KafkaStreamsFactory jobFactory = new SingleThreadKafkaStreamsFactory(kafkaServers);
-        final QueryExecutor executor = new LocalQueryExecutor(jobFactory);
+        final QueryExecutor executor = new LocalQueryExecutor(createKafkaTopic, jobFactory);
         executor.startAndWait();
         try {
             // Start the query.
diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorTest.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorTest.java
index 0df5794..c0f888e 100644
--- a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorTest.java
+++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorTest.java
@@ -32,6 +32,7 @@
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.rya.streams.api.entity.StreamsQuery;
 import org.apache.rya.streams.kafka.KafkaStreamsFactory;
+import org.apache.rya.streams.kafka.interactor.CreateKafkaTopic;
 import org.apache.rya.streams.querymanager.QueryExecutor;
 import org.junit.Test;
 
@@ -44,7 +45,7 @@
 
     @Test(expected = IllegalStateException.class)
     public void startQuery_serviceNotStarted() throws Exception {
-        final QueryExecutor executor = new LocalQueryExecutor(mock(KafkaStreamsFactory.class));
+        final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), mock(KafkaStreamsFactory.class));
         executor.startQuery("rya", new StreamsQuery(UUID.randomUUID(), "query", true));
     }
 
@@ -60,7 +61,7 @@
         when(jobFactory.make(eq(ryaInstance), eq(query))).thenReturn(queryJob);
 
         // Start the executor that will be tested.
-        final QueryExecutor executor = new LocalQueryExecutor(jobFactory);
+        final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), jobFactory);
         executor.startAndWait();
         try {
             // Tell the executor to start the query.
@@ -75,14 +76,14 @@
 
     @Test(expected = IllegalStateException.class)
     public void stopQuery_serviceNotStarted() throws Exception {
-        final QueryExecutor executor = new LocalQueryExecutor(mock(KafkaStreamsFactory.class));
+        final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), mock(KafkaStreamsFactory.class));
         executor.stopQuery(UUID.randomUUID());
     }
 
     @Test
     public void stopQuery_queryNotRunning() throws Exception {
         // Start an executor.
-        final QueryExecutor executor = new LocalQueryExecutor(mock(KafkaStreamsFactory.class));
+        final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), mock(KafkaStreamsFactory.class));
         executor.startAndWait();
         try {
             // Try to stop a query that was never stareted.
@@ -104,7 +105,7 @@
         when(jobFactory.make(eq(ryaInstance), eq(query))).thenReturn(queryJob);
 
         // Start the executor that will be tested.
-        final QueryExecutor executor = new LocalQueryExecutor(jobFactory);
+        final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), jobFactory);
         executor.startAndWait();
         try {
             // Tell the executor to start the query.
@@ -122,7 +123,7 @@
 
     @Test(expected = IllegalStateException.class)
     public void stopAll_serviceNotStarted() throws Exception {
-        final QueryExecutor executor = new LocalQueryExecutor(mock(KafkaStreamsFactory.class));
+        final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), mock(KafkaStreamsFactory.class));
         executor.stopAll("rya");
     }
 
@@ -141,7 +142,7 @@
         when(jobFactory.make(eq(ryaInstance), eq(query2))).thenReturn(queryJob2);
 
         // Start the executor that will be tested.
-        final QueryExecutor executor = new LocalQueryExecutor(jobFactory);
+        final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), jobFactory);
         executor.startAndWait();
         try {
             // Tell the executor to start the queries.
@@ -180,7 +181,7 @@
         when(jobFactory.make(eq(ryaInstance2), eq(query2))).thenReturn(queryJob2);
 
         // Start the executor that will be tested.
-        final QueryExecutor executor = new LocalQueryExecutor(jobFactory);
+        final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), jobFactory);
         executor.startAndWait();
         try {
             // Tell the executor to start the queries.
@@ -205,14 +206,14 @@
 
     @Test(expected = IllegalStateException.class)
     public void getRunningQueryIds_serviceNotStarted() throws Exception {
-        final QueryExecutor executor = new LocalQueryExecutor(mock(KafkaStreamsFactory.class));
+        final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), mock(KafkaStreamsFactory.class));
         executor.getRunningQueryIds();
     }
 
     @Test
     public void getRunningQueryIds_noneStarted() throws Exception {
         // Start an executor.
-        final QueryExecutor executor = new LocalQueryExecutor(mock(KafkaStreamsFactory.class));
+        final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), mock(KafkaStreamsFactory.class));
         executor.startAndWait();
         try {
             // Get the list of running queries.
@@ -240,7 +241,7 @@
         when(jobFactory.make(eq(ryaInstance), eq(query3))).thenReturn(mock(KafkaStreams.class));
 
         // Start the executor that will be tested.
-        final QueryExecutor executor = new LocalQueryExecutor(jobFactory);
+        final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), jobFactory);
         executor.startAndWait();
         try {
             // Start the queries.
@@ -275,7 +276,7 @@
         when(jobFactory.make(eq(ryaInstance), eq(query3))).thenReturn(mock(KafkaStreams.class));
 
         // Start the executor that will be tested.
-        final QueryExecutor executor = new LocalQueryExecutor(jobFactory);
+        final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), jobFactory);
         executor.startAndWait();
         try {
             // Start the queries.
diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/xml/QueryManagerConfigMarshallerTest.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/xml/QueryManagerConfigMarshallerTest.java
index f2b50ab..de6b9f3 100644
--- a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/xml/QueryManagerConfigMarshallerTest.java
+++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/xml/QueryManagerConfigMarshallerTest.java
@@ -45,6 +45,11 @@
                 "            <port>6</port>\n" +
                 "        </kafka>\n" +
                 "    </queryChangeLogSource>\n" +
+                "    <queryExecutor>\n" +
+                "        <localKafkaStreams>\n" +
+                "            <zookeepers>zoo1,zoo2,zoo3</zookeepers>\n" +
+                "        </localKafkaStreams>\n" +
+                "    </queryExecutor>\n" +
                 "    <performanceTunning>\n" +
                 "        <queryChanngeLogDiscoveryPeriod>\n" +
                 "            <value>1</value>\n" +