RYA-450 Implemented a Kafka backed QueryChangeLogSource.
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java
index 3e0df50..095465c 100644
--- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -20,6 +20,7 @@
import static java.util.Objects.requireNonNull;
+import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
@@ -40,6 +41,8 @@
@DefaultAnnotation(NonNull.class)
public class KafkaTopics {
+ private static final String QUERY_CHANGE_LOG_TOPIC_SUFFIX = "-QueryChangeLog";
+
/**
* Creates the Kafka topic name that is used for a specific instance of Rya's {@link QueryChangeLog}.
*
@@ -48,7 +51,28 @@
*/
public static String queryChangeLogTopic(final String ryaInstance) {
requireNonNull(ryaInstance);
- return ryaInstance + "-QueryChangeLog";
+ return ryaInstance + QUERY_CHANGE_LOG_TOPIC_SUFFIX;
+ }
+
+ /**
+ * Get the Rya instance name from a Kafka topic name that has been used for a {@link QueryChnageLog}.
+ * <p/>
+ * This is the inverse function of {@link #queryChangeLogTopic(String)}.
+ *
+ * @param changeLogTopic - The topic to evaluate. (not null)
+ * @return If the topic is well formatted, then the Rya instance name that was part of the topic name.
+ */
+ public static Optional<String> getRyaInstance(final String changeLogTopic) {
+ requireNonNull(changeLogTopic);
+
+ // Return absent if the provided topic does not represent a query change log topic.
+ if(!changeLogTopic.endsWith(QUERY_CHANGE_LOG_TOPIC_SUFFIX)) {
+ return Optional.empty();
+ }
+
+ // Everything before the suffix is the Rya instance name.
+ final int endIndex = changeLogTopic.length() - QUERY_CHANGE_LOG_TOPIC_SUFFIX.length();
+ return Optional.of( changeLogTopic.substring(0, endIndex) );
}
/**
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTopicsTest.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTopicsTest.java
new file mode 100644
index 0000000..a057de7
--- /dev/null
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTopicsTest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import java.util.Optional;
+
+import org.junit.Test;
+
+/**
+ * Unit tests the methods of {@link KafkaTopics}.
+ */
+public class KafkaTopicsTest {
+
+ @Test
+ public void getRyaInstance_wellFormattedTopic() {
+ // Make a topic name using a Rya instance name.
+ final String ryaInstance = "test";
+ final String topicName = KafkaTopics.queryChangeLogTopic(ryaInstance);
+
+ // Show the rya instance name is able to be extracted from the topic.
+ final Optional<String> resolvedRyaInstance = KafkaTopics.getRyaInstance(topicName);
+ assertEquals(ryaInstance, resolvedRyaInstance.get());
+ }
+
+ @Test
+ public void getRyaInstance_invalidTopicName() {
+ // Make up an invalid topic name.
+ final String invalidTopic = "thisIsABadTopicName";
+
+ // Show there is no Rya Instance name in it.
+ final Optional<String> ryaInstance = KafkaTopics.getRyaInstance(invalidTopic);
+ assertFalse( ryaInstance.isPresent() );
+ }
+}
\ No newline at end of file
diff --git a/extras/rya.streams/query-manager/pom.xml b/extras/rya.streams/query-manager/pom.xml
index 76e521d..d321ab5 100644
--- a/extras/rya.streams/query-manager/pom.xml
+++ b/extras/rya.streams/query-manager/pom.xml
@@ -37,15 +37,32 @@
<!-- Rya dependencies -->
<dependency>
<groupId>org.apache.rya</groupId>
- <artifactId>rya.streams.client</artifactId>
+ <artifactId>rya.streams.kafka</artifactId>
</dependency>
-
+
<!-- Apache Daemon dependencies -->
<dependency>
<groupId>commons-daemon</groupId>
<artifactId>commons-daemon</artifactId>
<version>1.1.0</version>
</dependency>
+
+ <!-- Test dependencies -->
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.test.kafka</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
<!-- Add the XSD directory as a resource so that it will be packaged in the jar.
@@ -121,4 +138,4 @@
</plugin>
</plugins>
</build>
-</project>
\ No newline at end of file
+</project>
diff --git a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSource.java b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSource.java
new file mode 100644
index 0000000..32305f5
--- /dev/null
+++ b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSource.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.querymanager.kafka;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.rya.streams.api.queries.QueryChangeLog;
+import org.apache.rya.streams.kafka.KafkaTopics;
+import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog;
+import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLogFactory;
+import org.apache.rya.streams.querymanager.QueryChangeLogSource;
+
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.AbstractScheduledService;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Represents a Kafka Server that manages {@link KafkaQueryChangeLog}s.
+ * <p/>
+ * Thread safe.
+ */
+@DefaultAnnotation(NonNull.class)
+public class KafkaQueryChangeLogSource extends AbstractScheduledService implements QueryChangeLogSource {
+
+ /**
+ * Ensures thread safe interactions with this object.
+ */
+ private final ReentrantLock lock = new ReentrantLock();
+
+ /**
+ * Used by the service to configure how often it polls the Kafka Server for topics.
+ */
+ private final Scheduler scheduler;
+
+ /**
+ * Which Kafka Server this source represents.
+ */
+ private final String kafkaBootstrapServer;
+
+ /**
+ * Listeners that need to be notified when logs are created/deleted.
+ */
+ private final Set<SourceListener> listeners = new HashSet<>();
+
+ /**
+ * Maps Rya instance name to a Query Change Log for that instance. This map is used to keep
+ * track of how the change logs change over time within the Kafka Server.
+ */
+ private final HashMap<String, QueryChangeLog> knownChangeLogs = new HashMap<>();
+
+ /**
+ * A consumer that is used to poll the Kafka Server for topics.
+ */
+ private KafkaConsumer<String, String> listTopicsConsumer;
+
+ /**
+ * Constructs an instance of {@link KafkaQueryChangeLogSource}.
+ *
+ * @param kafkaHostname - The hostname of the Kafka Server that is the source. (not null)
+ * @param kafkaPort - The port of the Kafka Server that is the source. (not null)
+ * @param scheduler - How frequently this source will poll the Kafka Server for topics. (not null)
+ */
+ public KafkaQueryChangeLogSource(
+ final String kafkaHostname,
+ final int kafkaPort,
+ final Scheduler scheduler) {
+ kafkaBootstrapServer = requireNonNull(kafkaHostname) + ":" + kafkaPort;
+ this.scheduler = requireNonNull(scheduler);
+ }
+
+ @Override
+ protected void startUp() throws Exception {
+ // Setup the consumer that is used to list topics for the source.
+ final Properties consumerProperties = new Properties();
+ consumerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServer);
+ consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
+ consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ listTopicsConsumer = new KafkaConsumer<>(consumerProperties);
+ }
+
+ @Override
+ protected void shutDown() throws Exception {
+ // Shut down the consumer that's used to list topics.
+ listTopicsConsumer.close();
+
+ // Shut down all of the change logs that were created within this class.
+ for(final QueryChangeLog changeLog : knownChangeLogs.values()) {
+ changeLog.close();
+ }
+ }
+
+ @Override
+ public void subscribe(final SourceListener listener) {
+ requireNonNull(listener);
+ lock.lock();
+ try {
+ // Add the listener to the list of known listeners.
+ listeners.add(listener);
+
+ // Notify it with everything that already exists.
+ for(final Entry<String, QueryChangeLog> entry : knownChangeLogs.entrySet()) {
+ listener.notifyCreate(entry.getKey(), entry.getValue());
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public void unsubscribe(final SourceListener listener) {
+ requireNonNull(listener);
+ lock.lock();
+ try {
+ // Remove the listener from the list of known listeners.
+ listeners.remove(listener);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ protected void runOneIteration() throws Exception {
+ lock.lock();
+ try {
+ // Get the list of topics from the Kafka Server.
+ final Set<String> changeLogTopics = new HashSet<>( listTopicsConsumer.listTopics().keySet() );
+
+ // Remove all topics that are not valid Rya Query Change Log topic names.
+ changeLogTopics.removeIf( topic -> !KafkaTopics.getRyaInstance(topic).isPresent() );
+
+ // Extract the Rya instance names from the change log topics.
+ final Set<String> ryaInstances = changeLogTopics.stream()
+ .map(topic -> KafkaTopics.getRyaInstance(topic).get() )
+ .collect(Collectors.toSet());
+
+ // Any Rya instances that are in the old set of topics, but not the new one, have been deleted.
+ final Set<String> deletedRyaInstances = new HashSet<>( Sets.difference(knownChangeLogs.keySet(), ryaInstances) );
+
+ // Any Rya instances that are in the new set of topics, but not the old set, have been created.
+ final Set<String> createdRyaInstances = new HashSet<>( Sets.difference(ryaInstances, knownChangeLogs.keySet()) );
+
+ // Handle the deletes.
+ for(final String deletedRyaInstance : deletedRyaInstances) {
+ // Remove the change log from the set of known logs.
+ final QueryChangeLog removed = knownChangeLogs.remove(deletedRyaInstance);
+
+ // Notify the listeners of the update.
+ for(final SourceListener listener : listeners) {
+ listener.notifyDelete(deletedRyaInstance);
+ }
+
+ // Ensure the change log is closed.
+ removed.close();
+ }
+
+ // Handle the adds.
+ for(final String createdRyaInstance : createdRyaInstances) {
+ // Create and store the ChangeLog.
+ final String changeLogTopic = KafkaTopics.queryChangeLogTopic(createdRyaInstance);
+ final KafkaQueryChangeLog changeLog = KafkaQueryChangeLogFactory.make(kafkaBootstrapServer, changeLogTopic);
+ knownChangeLogs.put(createdRyaInstance, changeLog);
+
+ // Notify the listeners of the update.
+ for(final SourceListener listener : listeners) {
+ listener.notifyCreate(createdRyaInstance, changeLog);
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ protected Scheduler scheduler() {
+ return scheduler;
+ }
+}
\ No newline at end of file
diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSourceIT.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSourceIT.java
new file mode 100644
index 0000000..5914b78
--- /dev/null
+++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSourceIT.java
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.querymanager.kafka;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.rya.streams.api.queries.QueryChangeLog;
+import org.apache.rya.streams.kafka.KafkaTopics;
+import org.apache.rya.streams.querymanager.QueryChangeLogSource;
+import org.apache.rya.streams.querymanager.QueryChangeLogSource.SourceListener;
+import org.apache.rya.test.kafka.KafkaTestInstanceRule;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
+
+/**
+ * Integration tests the methods of {@link KafkaQueryChangeLogSource}.
+ */
+public class KafkaQueryChangeLogSourceIT {
+
+ @Rule
+ public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(false);
+
+ @Before
+ public void clearTopics() throws InterruptedException {
+ kafka.deleteAllTopics();
+ }
+
+ @Test
+ public void discoverExistingLogs() throws Exception {
+ // Create a valid Query Change Log topic.
+ final String ryaInstance = UUID.randomUUID().toString();
+ final String topic = KafkaTopics.queryChangeLogTopic(ryaInstance);
+ kafka.createTopic(topic);
+
+ // Create the source.
+ final QueryChangeLogSource source = new KafkaQueryChangeLogSource(
+ kafka.getKafkaHostname(),
+ Integer.parseInt( kafka.getKafkaPort() ),
+ Scheduler.newFixedRateSchedule(0, 1, TimeUnit.SECONDS));
+
+ // Register a listener that counts down a latch if it sees the new topic.
+ final CountDownLatch created = new CountDownLatch(1);
+ source.subscribe(new SourceListener() {
+ @Override
+ public void notifyCreate(final String ryaInstanceName, final QueryChangeLog log) {
+ assertEquals(ryaInstance, ryaInstanceName);
+ created.countDown();
+ }
+
+ @Override
+ public void notifyDelete(final String ryaInstanceName) { }
+ });
+
+ try {
+ // Start the source.
+ source.startAndWait();
+
+ // If the latch isn't counted down, then fail the test.
+ assertTrue( created.await(5, TimeUnit.SECONDS) );
+
+ } finally {
+ source.stopAndWait();
+ }
+ }
+
+ @Test
+ public void discoverNewLogs() throws Exception {
+ // Create the source.
+ final QueryChangeLogSource source = new KafkaQueryChangeLogSource(
+ kafka.getKafkaHostname(),
+ Integer.parseInt( kafka.getKafkaPort() ),
+ Scheduler.newFixedRateSchedule(0, 100, TimeUnit.MILLISECONDS));
+
+ // Register a listener that counts down a latch if it sees the new topic.
+ final String ryaInstance = UUID.randomUUID().toString();
+ final CountDownLatch created = new CountDownLatch(1);
+ source.subscribe(new SourceListener() {
+ @Override
+ public void notifyCreate(final String ryaInstanceName, final QueryChangeLog log) {
+ assertEquals(ryaInstance, ryaInstanceName);
+ created.countDown();
+ }
+
+ @Override
+ public void notifyDelete(final String ryaInstanceName) { }
+ });
+
+ try {
+ // Start the source.
+ source.startAndWait();
+
+ // Wait twice the polling duration to ensure it iterates at least once.
+ Thread.sleep(200);
+
+ // Create a valid Query Change Log topic.
+ final String topic = KafkaTopics.queryChangeLogTopic(ryaInstance);
+ kafka.createTopic(topic);
+
+ // If the latch isn't counted down, then fail the test.
+ assertTrue( created.await(5, TimeUnit.SECONDS) );
+ } finally {
+ source.stopAndWait();
+ }
+ }
+
+ @Test
+ public void discoverLogDeletions() throws Exception {
+ // Create a valid Query Change Log topic.
+ final String ryaInstance = UUID.randomUUID().toString();
+ final String topic = KafkaTopics.queryChangeLogTopic(ryaInstance);
+ kafka.createTopic(topic);
+
+ // Create the source.
+ final QueryChangeLogSource source = new KafkaQueryChangeLogSource(
+ kafka.getKafkaHostname(),
+ Integer.parseInt( kafka.getKafkaPort() ),
+ Scheduler.newFixedRateSchedule(0, 1, TimeUnit.SECONDS));
+
+ // Register a listener that uses latches to indicate when the topic is created and deleted.
+ final CountDownLatch created = new CountDownLatch(1);
+ final CountDownLatch deleted = new CountDownLatch(1);
+ source.subscribe(new SourceListener() {
+ @Override
+ public void notifyCreate(final String ryaInstanceName, final QueryChangeLog log) {
+ assertEquals(ryaInstance, ryaInstanceName);
+ created.countDown();
+ }
+
+ @Override
+ public void notifyDelete(final String ryaInstanceName) {
+ assertEquals(ryaInstance, ryaInstanceName);
+ deleted.countDown();
+ }
+ });
+
+ try {
+ // Start the source
+ source.startAndWait();
+
+ // Wait for it to indicate the topic was created.
+ assertTrue( created.await(5, TimeUnit.SECONDS) );
+
+ // Delete the topic.
+ kafka.deleteTopic(topic);
+
+ // If the latch isn't counted down, then fail the test.
+ assertTrue( deleted.await(5, TimeUnit.SECONDS) );
+
+ } finally {
+ source.stopAndWait();
+ }
+ }
+
+ @Test
+ public void newListenerReceivesAllKnownLogs() throws Exception {
+ // Create a valid Query Change Log topic.
+ final String ryaInstance = UUID.randomUUID().toString();
+ final String topic = KafkaTopics.queryChangeLogTopic(ryaInstance);
+ kafka.createTopic(topic);
+
+ // Create the source.
+ final QueryChangeLogSource source = new KafkaQueryChangeLogSource(
+ kafka.getKafkaHostname(),
+ Integer.parseInt( kafka.getKafkaPort() ),
+ Scheduler.newFixedRateSchedule(0, 1, TimeUnit.SECONDS));
+
+ // Register a listener that counts down a latch if it sees the new topic.
+ final CountDownLatch created = new CountDownLatch(1);
+ source.subscribe(new SourceListener() {
+ @Override
+ public void notifyCreate(final String ryaInstanceName, final QueryChangeLog log) {
+ assertEquals(ryaInstance, ryaInstanceName);
+ created.countDown();
+ }
+
+ @Override
+ public void notifyDelete(final String ryaInstanceName) { }
+ });
+
+ try {
+ // Start the source
+ source.startAndWait();
+
+ // Wait for that first listener to indicate the topic was created. This means that one has been cached.
+ assertTrue( created.await(5, TimeUnit.SECONDS) );
+
+ // Register a second listener that counts down when that same topic is encountered. This means the
+ // newly subscribed listener was notified with the already known change log.
+ final CountDownLatch newListenerCreated = new CountDownLatch(1);
+ source.subscribe(new SourceListener() {
+ @Override
+ public void notifyCreate(final String ryaInstanceName, final QueryChangeLog log) {
+ assertEquals(ryaInstance, ryaInstanceName);
+ newListenerCreated.countDown();
+ }
+
+ @Override
+ public void notifyDelete(final String ryaInstanceName) { }
+ });
+ assertTrue( newListenerCreated.await(5, TimeUnit.SECONDS) );
+
+ } finally {
+ source.stopAndWait();
+ }
+ }
+
+ @Test
+ public void unsubscribedDoesNotReceiveNotifications() throws Exception {
+ // Create the source.
+ final QueryChangeLogSource source = new KafkaQueryChangeLogSource(
+ kafka.getKafkaHostname(),
+ Integer.parseInt( kafka.getKafkaPort() ),
+ Scheduler.newFixedRateSchedule(0, 100, TimeUnit.MILLISECONDS));
+
+ try {
+ // Start the source.
+ source.startAndWait();
+
+ // Create a listener that flips a boolean to true when it is notified.
+ final AtomicBoolean notified = new AtomicBoolean(false);
+ final SourceListener listener = new SourceListener() {
+ @Override
+ public void notifyCreate(final String ryaInstanceName, final QueryChangeLog log) {
+ notified.set(true);
+ }
+
+ @Override
+ public void notifyDelete(final String ryaInstanceName) {
+ notified.set(true);
+ }
+ };
+
+ // Register and then unregister it.
+ source.subscribe(listener);
+ source.unsubscribe(listener);
+
+ // Create a topic.
+ final String ryaInstance = UUID.randomUUID().toString();
+ final String topic = KafkaTopics.queryChangeLogTopic(ryaInstance);
+ kafka.createTopic(topic);
+
+ //Wait longer than the polling time for the listener to be notified.
+ Thread.sleep(300);
+
+ // Show the boolean was never flipped to true.
+ assertFalse(notified.get());
+ } finally {
+ source.stopAndWait();
+ }
+ }
+}
\ No newline at end of file
diff --git a/test/kafka/src/main/java/org/apache/rya/test/kafka/EmbeddedKafkaInstance.java b/test/kafka/src/main/java/org/apache/rya/test/kafka/EmbeddedKafkaInstance.java
index c7c5929..3810d4f 100644
--- a/test/kafka/src/main/java/org/apache/rya/test/kafka/EmbeddedKafkaInstance.java
+++ b/test/kafka/src/main/java/org/apache/rya/test/kafka/EmbeddedKafkaInstance.java
@@ -75,6 +75,7 @@
brokerProps.setProperty(KafkaConfig$.MODULE$.PortProp(), brokerPort);
brokerProps.setProperty(KafkaConfig$.MODULE$.ZkConnectProp(), zookeperConnect);
brokerProps.setProperty(KafkaConfig$.MODULE$.LogDirsProp(), Files.createTempDirectory(getClass().getSimpleName() + "-").toAbsolutePath().toString());
+ brokerProps.setProperty(KafkaConfig$.MODULE$.DeleteTopicEnableProp(), "true");
final KafkaConfig config = new KafkaConfig(brokerProps);
final Time mock = new MockTime();
kafkaServer = TestUtils.createServer(config, mock);
diff --git a/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaTestInstanceRule.java b/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaTestInstanceRule.java
index 252c288..50ba4ea 100644
--- a/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaTestInstanceRule.java
+++ b/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaTestInstanceRule.java
@@ -19,9 +19,15 @@
package org.apache.rya.test.kafka;
import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
import org.I0Itec.zkclient.ZkClient;
import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.rules.ExternalResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -97,6 +103,52 @@
}
/**
+ * Marks a topic for deletion. You may have to wait some time for the delete to actually complete.
+ *
+ * @param topicName - The topic that will be deleted. (not null)
+ */
+ public void deleteTopic(final String topicName) {
+ ZkUtils zkUtils = null;
+ try {
+ logger.info("Deleting Kafka Topic: '{}'", topicName);
+ zkUtils = ZkUtils.apply(new ZkClient(kafkaInstance.getZookeeperConnect(), 30000, 30000, ZKStringSerializer$.MODULE$), false);
+ AdminUtils.deleteTopic(zkUtils, topicName);
+ }
+ finally {
+ if(zkUtils != null) {
+ zkUtils.close();
+ }
+ }
+ }
+
+ /**
+ * Delete all of the topics that are in the embedded Kafka instance.
+ *
+ * @throws InterruptedException Interrupted while waiting for the topics to be deleted.
+ */
+ public void deleteAllTopics() throws InterruptedException {
+ // Setup the consumer that is used to list topics for the source.
+ final Properties consumerProperties = createBootstrapServerConfig();
+ consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
+ consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+
+ try(final Consumer<String, String> listTopicsConsumer = new KafkaConsumer<>(consumerProperties)) {
+ // Mark all existing topics for deletion.
+ Set<String> topics = listTopicsConsumer.listTopics().keySet();
+ for(final String topic : topics) {
+ deleteTopic(topic);
+ }
+
+ // Loop and wait until they are all gone.
+ while(!topics.isEmpty()) {
+ Thread.sleep(100);
+ topics = listTopicsConsumer.listTopics().keySet();
+ }
+ }
+ }
+
+ /**
* @return A new Property object containing the correct value for Kafka's
* {@link CommonClientConfigs#BOOTSTRAP_SERVERS_CONFIG}.
*/