RYA-450 Implemented a Kafka backed QueryChangeLogSource.

diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java
index 3e0df50..095465c 100644
--- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,6 +20,7 @@
 
 import static java.util.Objects.requireNonNull;
 
+import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 import java.util.UUID;
@@ -40,6 +41,8 @@
 @DefaultAnnotation(NonNull.class)
 public class KafkaTopics {
 
+    private static final String QUERY_CHANGE_LOG_TOPIC_SUFFIX = "-QueryChangeLog";
+
     /**
      * Creates the Kafka topic name that is used for a specific instance of Rya's {@link QueryChangeLog}.
      *
@@ -48,7 +51,28 @@
      */
     public static String queryChangeLogTopic(final String ryaInstance) {
         requireNonNull(ryaInstance);
-        return ryaInstance + "-QueryChangeLog";
+        return ryaInstance + QUERY_CHANGE_LOG_TOPIC_SUFFIX;
+    }
+
+    /**
+     * Get the Rya instance name from a Kafka topic name that has been used for a {@link QueryChnageLog}.
+     * <p/>
+     * This is the inverse function of {@link #queryChangeLogTopic(String)}.
+     *
+     * @param changeLogTopic - The topic to evaluate. (not null)
+     * @return If the topic is well formatted, then the Rya instance name that was part of the topic name.
+     */
+    public static Optional<String> getRyaInstance(final String changeLogTopic) {
+        requireNonNull(changeLogTopic);
+
+        // Return absent if the provided topic does not represent a query change log topic.
+        if(!changeLogTopic.endsWith(QUERY_CHANGE_LOG_TOPIC_SUFFIX)) {
+            return Optional.empty();
+        }
+
+        // Everything before the suffix is the Rya instance name.
+        final int endIndex = changeLogTopic.length() - QUERY_CHANGE_LOG_TOPIC_SUFFIX.length();
+        return Optional.of( changeLogTopic.substring(0, endIndex) );
     }
 
     /**
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTopicsTest.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTopicsTest.java
new file mode 100644
index 0000000..a057de7
--- /dev/null
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTopicsTest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import java.util.Optional;
+
+import org.junit.Test;
+
+/**
+ * Unit tests the methods of {@link KafkaTopics}.
+ */
+public class KafkaTopicsTest {
+
+    @Test
+    public void getRyaInstance_wellFormattedTopic() {
+        // Make a topic name using a Rya instance name.
+        final String ryaInstance = "test";
+        final String topicName = KafkaTopics.queryChangeLogTopic(ryaInstance);
+
+        // Show the rya instance name is able to be extracted from the topic.
+        final Optional<String> resolvedRyaInstance = KafkaTopics.getRyaInstance(topicName);
+        assertEquals(ryaInstance, resolvedRyaInstance.get());
+    }
+
+    @Test
+    public void getRyaInstance_invalidTopicName() {
+        // Make up an invalid topic name.
+        final String invalidTopic = "thisIsABadTopicName";
+
+        // Show there is no Rya Instance name in it.
+        final Optional<String> ryaInstance = KafkaTopics.getRyaInstance(invalidTopic);
+        assertFalse( ryaInstance.isPresent() );
+    }
+}
\ No newline at end of file
diff --git a/extras/rya.streams/query-manager/pom.xml b/extras/rya.streams/query-manager/pom.xml
index 76e521d..d321ab5 100644
--- a/extras/rya.streams/query-manager/pom.xml
+++ b/extras/rya.streams/query-manager/pom.xml
@@ -37,15 +37,32 @@
         <!-- Rya dependencies -->
         <dependency>
             <groupId>org.apache.rya</groupId>
-            <artifactId>rya.streams.client</artifactId>
+            <artifactId>rya.streams.kafka</artifactId>
         </dependency>
-
+        
         <!-- Apache Daemon dependencies -->
         <dependency>
             <groupId>commons-daemon</groupId>
             <artifactId>commons-daemon</artifactId>
             <version>1.1.0</version>
         </dependency>
+        
+        <!-- Test dependencies -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.test.kafka</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
     <build>
         <!-- Add the XSD directory as a resource so that it will be packaged in the jar. 
@@ -121,4 +138,4 @@
             </plugin>
         </plugins>
      </build>
-</project>
\ No newline at end of file
+</project>
diff --git a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSource.java b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSource.java
new file mode 100644
index 0000000..32305f5
--- /dev/null
+++ b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSource.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.querymanager.kafka;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.rya.streams.api.queries.QueryChangeLog;
+import org.apache.rya.streams.kafka.KafkaTopics;
+import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog;
+import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLogFactory;
+import org.apache.rya.streams.querymanager.QueryChangeLogSource;
+
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.AbstractScheduledService;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Represents a Kafka Server that manages {@link KafkaQueryChangeLog}s.
+ * <p/>
+ * Thread safe.
+ */
+@DefaultAnnotation(NonNull.class)
+public class KafkaQueryChangeLogSource extends AbstractScheduledService implements QueryChangeLogSource {
+
+    /**
+     * Ensures thread safe interactions with this object.
+     */
+    private final ReentrantLock lock = new ReentrantLock();
+
+    /**
+     * Used by the service to configure how often it polls the Kafka Server for topics.
+     */
+    private final Scheduler scheduler;
+
+    /**
+     * Which Kafka Server this source represents.
+     */
+    private final String kafkaBootstrapServer;
+
+    /**
+     * Listeners that need to be notified when logs are created/deleted.
+     */
+    private final Set<SourceListener> listeners = new HashSet<>();
+
+    /**
+     * Maps Rya instance name to a Query Change Log for that instance. This map is used to keep
+     * track of how the change logs change over time within the Kafka Server.
+     */
+    private final HashMap<String, QueryChangeLog> knownChangeLogs = new HashMap<>();
+
+    /**
+     * A consumer that is used to poll the Kafka Server for topics.
+     */
+    private KafkaConsumer<String, String> listTopicsConsumer;
+
+    /**
+     * Constructs an instance of {@link KafkaQueryChangeLogSource}.
+     *
+     * @param kafkaHostname - The hostname of the Kafka Server that is the source. (not null)
+     * @param kafkaPort - The port of the Kafka Server that is the source. (not null)
+     * @param scheduler - How frequently this source will poll the Kafka Server for topics. (not null)
+     */
+    public KafkaQueryChangeLogSource(
+            final String kafkaHostname,
+            final int kafkaPort,
+            final Scheduler scheduler) {
+        kafkaBootstrapServer = requireNonNull(kafkaHostname) + ":" + kafkaPort;
+        this.scheduler = requireNonNull(scheduler);
+    }
+
+    @Override
+    protected void startUp() throws Exception {
+        // Setup the consumer that is used to list topics for the source.
+        final Properties consumerProperties = new Properties();
+        consumerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServer);
+        consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
+        consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+        consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+        listTopicsConsumer = new KafkaConsumer<>(consumerProperties);
+    }
+
+    @Override
+    protected void shutDown() throws Exception {
+        // Shut down the consumer that's used to list topics.
+        listTopicsConsumer.close();
+
+        // Shut down all of the change logs that were created within this class.
+        for(final QueryChangeLog changeLog : knownChangeLogs.values()) {
+            changeLog.close();
+        }
+    }
+
+    @Override
+    public void subscribe(final SourceListener listener) {
+        requireNonNull(listener);
+        lock.lock();
+        try {
+            // Add the listener to the list of known listeners.
+            listeners.add(listener);
+
+            // Notify it with everything that already exists.
+            for(final Entry<String, QueryChangeLog> entry : knownChangeLogs.entrySet()) {
+                listener.notifyCreate(entry.getKey(), entry.getValue());
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    @Override
+    public void unsubscribe(final SourceListener listener) {
+        requireNonNull(listener);
+        lock.lock();
+        try {
+            // Remove the listener from the list of known listeners.
+            listeners.remove(listener);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    @Override
+    protected void runOneIteration() throws Exception {
+        lock.lock();
+        try {
+            // Get the list of topics from the Kafka Server.
+            final Set<String> changeLogTopics = new HashSet<>( listTopicsConsumer.listTopics().keySet() );
+
+            // Remove all topics that are not valid Rya Query Change Log topic names.
+            changeLogTopics.removeIf( topic -> !KafkaTopics.getRyaInstance(topic).isPresent() );
+
+            // Extract the Rya instance names from the change log topics.
+            final Set<String> ryaInstances = changeLogTopics.stream()
+                    .map(topic -> KafkaTopics.getRyaInstance(topic).get() )
+                    .collect(Collectors.toSet());
+
+            // Any Rya instances that are in the old set of topics, but not the new one, have been deleted.
+            final Set<String> deletedRyaInstances = new HashSet<>( Sets.difference(knownChangeLogs.keySet(), ryaInstances) );
+
+            // Any Rya instances that are in the new set of topics, but not the old set, have been created.
+            final Set<String> createdRyaInstances = new HashSet<>( Sets.difference(ryaInstances, knownChangeLogs.keySet()) );
+
+            // Handle the deletes.
+            for(final String deletedRyaInstance : deletedRyaInstances) {
+                // Remove the change log from the set of known logs.
+                final QueryChangeLog removed = knownChangeLogs.remove(deletedRyaInstance);
+
+                // Notify the listeners of the update.
+                for(final SourceListener listener : listeners) {
+                    listener.notifyDelete(deletedRyaInstance);
+                }
+
+                // Ensure the change log is closed.
+                removed.close();
+            }
+
+            // Handle the adds.
+            for(final String createdRyaInstance : createdRyaInstances) {
+                // Create and store the ChangeLog.
+                final String changeLogTopic = KafkaTopics.queryChangeLogTopic(createdRyaInstance);
+                final KafkaQueryChangeLog changeLog = KafkaQueryChangeLogFactory.make(kafkaBootstrapServer, changeLogTopic);
+                knownChangeLogs.put(createdRyaInstance, changeLog);
+
+                // Notify the listeners of the update.
+                for(final SourceListener listener : listeners) {
+                    listener.notifyCreate(createdRyaInstance, changeLog);
+                }
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    @Override
+    protected Scheduler scheduler() {
+        return scheduler;
+    }
+}
\ No newline at end of file
diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSourceIT.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSourceIT.java
new file mode 100644
index 0000000..5914b78
--- /dev/null
+++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSourceIT.java
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.querymanager.kafka;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.rya.streams.api.queries.QueryChangeLog;
+import org.apache.rya.streams.kafka.KafkaTopics;
+import org.apache.rya.streams.querymanager.QueryChangeLogSource;
+import org.apache.rya.streams.querymanager.QueryChangeLogSource.SourceListener;
+import org.apache.rya.test.kafka.KafkaTestInstanceRule;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
+
+/**
+ * Integration tests the methods of {@link KafkaQueryChangeLogSource}.
+ */
+public class KafkaQueryChangeLogSourceIT {
+
+    @Rule
+    public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(false);
+
+    @Before
+    public void clearTopics() throws InterruptedException {
+        kafka.deleteAllTopics();
+    }
+
+    @Test
+    public void discoverExistingLogs() throws Exception {
+        // Create a valid Query Change Log topic.
+        final String ryaInstance = UUID.randomUUID().toString();
+        final String topic = KafkaTopics.queryChangeLogTopic(ryaInstance);
+        kafka.createTopic(topic);
+
+        // Create the source.
+        final QueryChangeLogSource source = new KafkaQueryChangeLogSource(
+                kafka.getKafkaHostname(),
+                Integer.parseInt( kafka.getKafkaPort() ),
+                Scheduler.newFixedRateSchedule(0, 1, TimeUnit.SECONDS));
+
+        // Register a listener that counts down a latch if it sees the new topic.
+        final CountDownLatch created = new CountDownLatch(1);
+        source.subscribe(new SourceListener() {
+            @Override
+            public void notifyCreate(final String ryaInstanceName, final QueryChangeLog log) {
+                assertEquals(ryaInstance, ryaInstanceName);
+                created.countDown();
+            }
+
+            @Override
+            public void notifyDelete(final String ryaInstanceName) { }
+        });
+
+        try {
+            // Start the source.
+            source.startAndWait();
+
+            // If the latch isn't counted down, then fail the test.
+            assertTrue( created.await(5, TimeUnit.SECONDS) );
+
+        } finally {
+            source.stopAndWait();
+        }
+    }
+
+    @Test
+    public void discoverNewLogs() throws Exception {
+        // Create the source.
+        final QueryChangeLogSource source = new KafkaQueryChangeLogSource(
+                kafka.getKafkaHostname(),
+                Integer.parseInt( kafka.getKafkaPort() ),
+                Scheduler.newFixedRateSchedule(0, 100, TimeUnit.MILLISECONDS));
+
+        // Register a listener that counts down a latch if it sees the new topic.
+        final String ryaInstance = UUID.randomUUID().toString();
+        final CountDownLatch created = new CountDownLatch(1);
+        source.subscribe(new SourceListener() {
+            @Override
+            public void notifyCreate(final String ryaInstanceName, final QueryChangeLog log) {
+                assertEquals(ryaInstance, ryaInstanceName);
+                created.countDown();
+            }
+
+            @Override
+            public void notifyDelete(final String ryaInstanceName) { }
+        });
+
+        try {
+            // Start the source.
+            source.startAndWait();
+
+            // Wait twice the polling duration to ensure it iterates at least once.
+            Thread.sleep(200);
+
+            // Create a valid Query Change Log topic.
+            final String topic = KafkaTopics.queryChangeLogTopic(ryaInstance);
+            kafka.createTopic(topic);
+
+            // If the latch isn't counted down, then fail the test.
+            assertTrue( created.await(5, TimeUnit.SECONDS) );
+        } finally {
+            source.stopAndWait();
+        }
+    }
+
+    @Test
+    public void discoverLogDeletions() throws Exception {
+        // Create a valid Query Change Log topic.
+        final String ryaInstance = UUID.randomUUID().toString();
+        final String topic = KafkaTopics.queryChangeLogTopic(ryaInstance);
+        kafka.createTopic(topic);
+
+        // Create the source.
+        final QueryChangeLogSource source = new KafkaQueryChangeLogSource(
+                kafka.getKafkaHostname(),
+                Integer.parseInt( kafka.getKafkaPort() ),
+                Scheduler.newFixedRateSchedule(0, 1, TimeUnit.SECONDS));
+
+        // Register a listener that uses latches to indicate when the topic is created and deleted.
+        final CountDownLatch created = new CountDownLatch(1);
+        final CountDownLatch deleted = new CountDownLatch(1);
+        source.subscribe(new SourceListener() {
+            @Override
+            public void notifyCreate(final String ryaInstanceName, final QueryChangeLog log) {
+                assertEquals(ryaInstance, ryaInstanceName);
+                created.countDown();
+            }
+
+            @Override
+            public void notifyDelete(final String ryaInstanceName) {
+                assertEquals(ryaInstance, ryaInstanceName);
+                deleted.countDown();
+            }
+        });
+
+        try {
+            // Start the source
+            source.startAndWait();
+
+            // Wait for it to indicate the topic was created.
+            assertTrue( created.await(5, TimeUnit.SECONDS) );
+
+            // Delete the topic.
+            kafka.deleteTopic(topic);
+
+            // If the latch isn't counted down, then fail the test.
+            assertTrue( deleted.await(5, TimeUnit.SECONDS) );
+
+        } finally {
+            source.stopAndWait();
+        }
+    }
+
+    @Test
+    public void newListenerReceivesAllKnownLogs() throws Exception {
+        // Create a valid Query Change Log topic.
+        final String ryaInstance = UUID.randomUUID().toString();
+        final String topic = KafkaTopics.queryChangeLogTopic(ryaInstance);
+        kafka.createTopic(topic);
+
+        // Create the source.
+        final QueryChangeLogSource source = new KafkaQueryChangeLogSource(
+                kafka.getKafkaHostname(),
+                Integer.parseInt( kafka.getKafkaPort() ),
+                Scheduler.newFixedRateSchedule(0, 1, TimeUnit.SECONDS));
+
+        // Register a listener that counts down a latch if it sees the new topic.
+        final CountDownLatch created = new CountDownLatch(1);
+        source.subscribe(new SourceListener() {
+            @Override
+            public void notifyCreate(final String ryaInstanceName, final QueryChangeLog log) {
+                assertEquals(ryaInstance, ryaInstanceName);
+                created.countDown();
+            }
+
+            @Override
+            public void notifyDelete(final String ryaInstanceName) { }
+        });
+
+        try {
+            // Start the source
+            source.startAndWait();
+
+            // Wait for that first listener to indicate the topic was created. This means that one has been cached.
+            assertTrue( created.await(5, TimeUnit.SECONDS) );
+
+            // Register a second listener that counts down when that same topic is encountered. This means the
+            // newly subscribed listener was notified with the already known change log.
+            final CountDownLatch newListenerCreated = new CountDownLatch(1);
+            source.subscribe(new SourceListener() {
+                @Override
+                public void notifyCreate(final String ryaInstanceName, final QueryChangeLog log) {
+                    assertEquals(ryaInstance, ryaInstanceName);
+                    newListenerCreated.countDown();
+                }
+
+                @Override
+                public void notifyDelete(final String ryaInstanceName) { }
+            });
+            assertTrue( newListenerCreated.await(5, TimeUnit.SECONDS) );
+
+        } finally {
+            source.stopAndWait();
+        }
+    }
+
+    @Test
+    public void unsubscribedDoesNotReceiveNotifications() throws Exception {
+        // Create the source.
+        final QueryChangeLogSource source = new KafkaQueryChangeLogSource(
+                kafka.getKafkaHostname(),
+                Integer.parseInt( kafka.getKafkaPort() ),
+                Scheduler.newFixedRateSchedule(0, 100, TimeUnit.MILLISECONDS));
+
+        try {
+            // Start the source.
+            source.startAndWait();
+
+            // Create a listener that flips a boolean to true when it is notified.
+            final AtomicBoolean notified = new AtomicBoolean(false);
+            final SourceListener listener = new SourceListener() {
+                @Override
+                public void notifyCreate(final String ryaInstanceName, final QueryChangeLog log) {
+                    notified.set(true);
+                }
+
+                @Override
+                public void notifyDelete(final String ryaInstanceName) {
+                    notified.set(true);
+                }
+            };
+
+            // Register and then unregister it.
+            source.subscribe(listener);
+            source.unsubscribe(listener);
+
+            // Create a topic.
+            final String ryaInstance = UUID.randomUUID().toString();
+            final String topic = KafkaTopics.queryChangeLogTopic(ryaInstance);
+            kafka.createTopic(topic);
+
+            //Wait longer than the polling time for the listener to be notified.
+            Thread.sleep(300);
+
+            // Show the boolean was never flipped to true.
+            assertFalse(notified.get());
+        } finally {
+            source.stopAndWait();
+        }
+    }
+}
\ No newline at end of file
diff --git a/test/kafka/src/main/java/org/apache/rya/test/kafka/EmbeddedKafkaInstance.java b/test/kafka/src/main/java/org/apache/rya/test/kafka/EmbeddedKafkaInstance.java
index c7c5929..3810d4f 100644
--- a/test/kafka/src/main/java/org/apache/rya/test/kafka/EmbeddedKafkaInstance.java
+++ b/test/kafka/src/main/java/org/apache/rya/test/kafka/EmbeddedKafkaInstance.java
@@ -75,6 +75,7 @@
         brokerProps.setProperty(KafkaConfig$.MODULE$.PortProp(), brokerPort);
         brokerProps.setProperty(KafkaConfig$.MODULE$.ZkConnectProp(), zookeperConnect);
         brokerProps.setProperty(KafkaConfig$.MODULE$.LogDirsProp(), Files.createTempDirectory(getClass().getSimpleName() + "-").toAbsolutePath().toString());
+        brokerProps.setProperty(KafkaConfig$.MODULE$.DeleteTopicEnableProp(), "true");
         final KafkaConfig config = new KafkaConfig(brokerProps);
         final Time mock = new MockTime();
         kafkaServer = TestUtils.createServer(config, mock);
diff --git a/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaTestInstanceRule.java b/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaTestInstanceRule.java
index 252c288..50ba4ea 100644
--- a/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaTestInstanceRule.java
+++ b/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaTestInstanceRule.java
@@ -19,9 +19,15 @@
 package org.apache.rya.test.kafka;
 
 import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
 
 import org.I0Itec.zkclient.ZkClient;
 import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.junit.rules.ExternalResource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -97,6 +103,52 @@
     }
 
     /**
+     * Marks a topic for deletion. You may have to wait some time for the delete to actually complete.
+     *
+     * @param topicName - The topic that will be deleted. (not null)
+     */
+    public void deleteTopic(final String topicName) {
+        ZkUtils zkUtils = null;
+        try {
+            logger.info("Deleting Kafka Topic: '{}'", topicName);
+            zkUtils = ZkUtils.apply(new ZkClient(kafkaInstance.getZookeeperConnect(), 30000, 30000, ZKStringSerializer$.MODULE$), false);
+            AdminUtils.deleteTopic(zkUtils, topicName);
+        }
+        finally {
+            if(zkUtils != null) {
+                zkUtils.close();
+            }
+        }
+    }
+
+    /**
+     * Delete all of the topics that are in the embedded Kafka instance.
+     *
+     * @throws InterruptedException Interrupted while waiting for the topics to be deleted.
+     */
+    public void deleteAllTopics() throws InterruptedException {
+        // Setup the consumer that is used to list topics for the source.
+        final Properties consumerProperties = createBootstrapServerConfig();
+        consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
+        consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+        consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+
+        try(final Consumer<String, String> listTopicsConsumer = new KafkaConsumer<>(consumerProperties)) {
+            // Mark all existing topics for deletion.
+            Set<String> topics = listTopicsConsumer.listTopics().keySet();
+            for(final String topic : topics) {
+                deleteTopic(topic);
+            }
+
+            // Loop and wait until they are all gone.
+            while(!topics.isEmpty()) {
+                Thread.sleep(100);
+                topics = listTopicsConsumer.listTopics().keySet();
+            }
+        }
+    }
+
+    /**
      * @return A new Property object containing the correct value for Kafka's
      *         {@link CommonClientConfigs#BOOTSTRAP_SERVERS_CONFIG}.
      */