RYA-451 Fixing threading issues with the QueryManager class.
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java
index 7194834..11423bd 100644
--- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java
@@ -85,7 +85,7 @@
}
return false;
}
-
+
@Override
public String toString() {
final StringBuilder sb = new StringBuilder();
@@ -95,7 +95,7 @@
sb.append(getSparql() + "\n");
sb.append("Is ");
if (!isActive) {
- sb.append(" Not ");
+ sb.append("Not ");
}
sb.append("Running.\n");
return sb.toString();
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java
index 5fb0297..95c1922 100644
--- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java
@@ -51,9 +51,9 @@
*/
@DefaultAnnotation(NonNull.class)
public class InMemoryQueryRepository extends AbstractScheduledService implements QueryRepository {
- private static final Logger LOG = LoggerFactory.getLogger(InMemoryQueryRepository.class);
+ private static final Logger log = LoggerFactory.getLogger(InMemoryQueryRepository.class);
- private final ReentrantLock lock = new ReentrantLock();
+ private final ReentrantLock lock = new ReentrantLock(true);
/**
* The change log that is the ground truth for describing what the queries look like.
@@ -198,7 +198,6 @@
@Override
protected void shutDown() throws Exception {
- super.shutDown();
lock.lock();
try {
changeLog.close();
@@ -211,11 +210,12 @@
* Updates the {@link #queriesCache} to reflect the latest position within the {@link #changeLog}.
*/
private void updateCache() {
- requireNonNull(changeLog);
+ log.trace("updateCache() - Enter");
CloseableIteration<ChangeLogEntry<QueryChange>, QueryChangeLogException> it = null;
try {
// Iterate over everything since the last position that was handled within the change log.
+ log.debug("Starting cache position:" + cachePosition);
if(cachePosition.isPresent()) {
it = changeLog.readFromPosition(cachePosition.get() + 1);
} else {
@@ -228,6 +228,8 @@
final QueryChange change = entry.getEntry();
final UUID queryId = change.getQueryId();
+ log.debug("Updating the cache to reflect:\n" + change);
+
switch(change.getChangeType()) {
case CREATE:
final StreamsQuery query = new StreamsQuery(
@@ -253,15 +255,17 @@
break;
}
+ log.debug("Notifying listeners with the updated state.");
final Optional<StreamsQuery> newQueryState = Optional.ofNullable(queriesCache.get(queryId));
listeners.forEach(listener -> listener.notify(entry, newQueryState));
cachePosition = Optional.of( entry.getPosition() );
+ log.debug("New chache position: " + cachePosition);
}
} catch (final QueryChangeLogException e) {
// Rethrow the exception because the object the supplier tried to create could not be created.
- throw new RuntimeException("Could not initialize the " + InMemoryQueryRepository.class.getName(), e);
+ throw new RuntimeException("Could not update the cache of " + InMemoryQueryRepository.class.getName(), e);
} finally {
// Try to close the iteration if it was opened.
@@ -270,18 +274,22 @@
it.close();
}
} catch (final QueryChangeLogException e) {
- LOG.error("Could not close the " + CloseableIteration.class.getName(), e);
+ log.error("Could not close the " + CloseableIteration.class.getName(), e);
}
+
+ log.trace("updateCache() - Exit");
}
}
@Override
protected void runOneIteration() throws Exception {
+ log.trace("runOneIteration() - Enter");
lock.lock();
try {
updateCache();
} finally {
lock.unlock();
+ log.trace("runOneIteration() - Exit");
}
}
@@ -292,17 +300,25 @@
@Override
public Set<StreamsQuery> subscribe(final QueryChangeLogListener listener) {
+ log.trace("subscribe(listener) - Enter");
+
//locks to prevent the current state from changing while subscribing.
lock.lock();
+ log.trace("subscribe(listener) - Acquired lock");
try {
listeners.add(listener);
+ log.trace("subscribe(listener) - Listener Registered");
//return the current state of the query repository
- return queriesCache.values()
+ final Set<StreamsQuery> queries = queriesCache.values()
.stream()
.collect(Collectors.toSet());
+ log.trace("subscribe(listener) - Returning " + queries.size() + " existing queries");
+ return queries;
} finally {
+ log.trace("subscribe(listener) - Releasing lock");
lock.unlock();
+ log.trace("subscribe(listener) - Exit");
}
}
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChange.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChange.java
index d283957..d34a394 100644
--- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChange.java
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChange.java
@@ -110,6 +110,16 @@
return false;
}
+ @Override
+ public String toString() {
+ return "QueryChange: {" +
+ " Query ID: " + queryId + ",\n" +
+ " Change Type: " + changeType + ",\n" +
+ " Is Active: " + isActive + ",\n" +
+ " SPARQL: " + sparql + "\n" +
+ "}";
+ }
+
/**
* Create a {@link QueryChange} that represents a new SPARQL query that will be managed by Rya Streams.
*
diff --git a/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java b/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java
index 3b3d48a..d7e116b 100644
--- a/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java
+++ b/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java
@@ -93,13 +93,9 @@
// Create a new totally in memory QueryRepository.
final QueryRepository initializedQueries = new InMemoryQueryRepository( changeLog, SCHEDULE );
- try {
- // Listing the queries should work using an initialized change log.
- final Set<StreamsQuery> stored = initializedQueries.list();
- assertEquals(expected, stored);
- } finally {
- queries.stop();
- }
+ // Listing the queries should work using an initialized change log.
+ final Set<StreamsQuery> stored = initializedQueries.list();
+ assertEquals(expected, stored);
} finally {
queries.stop();
}
@@ -166,7 +162,7 @@
final StreamsQuery query = queries.add("query 1", true);
final Set<StreamsQuery> existing = queries.subscribe((queryChangeEvent, newQueryState) -> {
- final ChangeLogEntry<QueryChange> expected = new ChangeLogEntry<QueryChange>(1L,
+ final ChangeLogEntry<QueryChange> expected = new ChangeLogEntry<>(1L,
QueryChange.create(queryChangeEvent.getEntry().getQueryId(), "query 2", true));
final Optional<StreamsQuery> expectedQueryState = Optional.of(
new StreamsQuery(queryChangeEvent.getEntry().getQueryId(), "query 2", true));
@@ -197,7 +193,7 @@
//show listener on repo that query was added to is being notified of the new query.
final CountDownLatch repo1Latch = new CountDownLatch(1);
queries.subscribe((queryChangeEvent, newQueryState) -> {
- final ChangeLogEntry<QueryChange> expected = new ChangeLogEntry<QueryChange>(0L,
+ final ChangeLogEntry<QueryChange> expected = new ChangeLogEntry<>(0L,
QueryChange.create(queryChangeEvent.getEntry().getQueryId(), "query 2", true));
final Optional<StreamsQuery> expectedQueryState = Optional.of(
new StreamsQuery(queryChangeEvent.getEntry().getQueryId(), "query 2", true));
@@ -210,7 +206,7 @@
//show listener not on the repo that query was added to is being notified as well.
final CountDownLatch repo2Latch = new CountDownLatch(1);
queries2.subscribe((queryChangeEvent, newQueryState) -> {
- final ChangeLogEntry<QueryChange> expected = new ChangeLogEntry<QueryChange>(0L,
+ final ChangeLogEntry<QueryChange> expected = new ChangeLogEntry<>(0L,
QueryChange.create(queryChangeEvent.getEntry().getQueryId(), "query 2", true));
final Optional<StreamsQuery> expectedQueryState = Optional.of(
new StreamsQuery(queryChangeEvent.getEntry().getQueryId(), "query 2", true));
diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java
index cd78975..a5507a6 100644
--- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java
+++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java
@@ -23,6 +23,7 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang.StringUtils;
import org.apache.rya.streams.api.entity.StreamsQuery;
import org.apache.rya.streams.api.exception.RyaStreamsException;
import org.apache.rya.streams.api.interactor.ListQueries;
@@ -112,12 +113,11 @@
sb.append("Queries in Rya Streams:\n");
sb.append("---------------------------------------------------------\n");
queries.forEach(query -> {
- sb.append("ID: ");
- sb.append(query.getQueryId());
- sb.append("\t\t");
- sb.append("Query: ");
- sb.append(query.getSparql());
- sb.append("\n");
+ sb.append("ID: ").append(query.getQueryId())
+ .append(" ")
+ .append("Is Active: ").append(query.isActive())
+ .append(StringUtils.rightPad("" + query.isActive(), 9))
+ .append("Query: ").append(query.getSparql()).append("\n");
});
return sb.toString();
}
diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java
index ddaf647..7b311f6 100644
--- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java
+++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java
@@ -136,7 +136,7 @@
final Set<String> topics = new HashSet<>();
topics.add( KafkaTopics.statementsTopic(params.ryaInstance) );
topics.add( KafkaTopics.queryResultsTopic(queryId) );
- KafkaTopics.createTopic(params.zookeeperServers, topics, 1, 1);
+ KafkaTopics.createTopics(params.zookeeperServers, topics, 1, 1);
// Run the query that uses those topics.
final KafkaRunQuery runQuery = new KafkaRunQuery(
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java
index 095465c..989799a 100644
--- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java
@@ -55,7 +55,7 @@
}
/**
- * Get the Rya instance name from a Kafka topic name that has been used for a {@link QueryChnageLog}.
+ * Get the Rya instance name from a Kafka topic name that has been used for a {@link QueryChangeLog}.
* <p/>
* This is the inverse function of {@link #queryChangeLogTopic(String)}.
*
@@ -106,7 +106,7 @@
* @param partitions - The number of partitions that each of the topics will have.
* @param replicationFactor - The replication factor of the topics that are created.
*/
- public static void createTopic(
+ public static void createTopics(
final String zookeeperServers,
final Set<String> topicNames,
final int partitions,
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/SingleThreadKafkaStreamsFactory.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/SingleThreadKafkaStreamsFactory.java
index 7ab7e90..8093951 100644
--- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/SingleThreadKafkaStreamsFactory.java
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/SingleThreadKafkaStreamsFactory.java
@@ -83,7 +83,7 @@
try {
final TopologyBuilder topologyBuilder = topologyFactory.build(query.getSparql(), statementsTopic, resultsTopic, new RandomUUIDFactory());
return new KafkaStreams(topologyBuilder, new StreamsConfig(streamsProps));
- } catch (MalformedQueryException | TopologyBuilderException e) {
+ } catch (final MalformedQueryException | TopologyBuilderException e) {
throw new KafkaStreamsFactoryException("Could not create a KafkaStreams processing topology for query " + query.getQueryId(), e);
}
}
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/CreateKafkaTopic.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/CreateKafkaTopic.java
new file mode 100644
index 0000000..771e1c8
--- /dev/null
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/CreateKafkaTopic.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.interactor;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Set;
+
+import org.apache.rya.streams.kafka.KafkaTopics;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Creates topics in Kafka.
+ */
+@DefaultAnnotation(NonNull.class)
+public class CreateKafkaTopic {
+
+ private final String zookeeperServers;
+
+ /**
+ * Constructs an instance of {@link CreateKafkaTopic}.
+ *
+ * @param zookeeperServers - The Zookeeper servers that are used to manage the Kafka instance. (not null)
+ */
+ public CreateKafkaTopic(final String zookeeperServers) {
+ this.zookeeperServers = requireNonNull(zookeeperServers);
+ }
+
+ /**
+ * Creates a set of Kafka topics for each topic that does not already exist.
+ *
+ * @param topicNames - The topics that will be created. (not null)
+ * @param partitions - The number of partitions that each of the topics will have.
+ * @param replicationFactor - The replication factor of the topics that are created.
+ */
+ public void createTopics(
+ final Set<String> topicNames,
+ final int partitions,
+ final int replicationFactor) {
+ KafkaTopics.createTopics(zookeeperServers, topicNames, partitions, replicationFactor);
+ }
+}
\ No newline at end of file
diff --git a/extras/rya.streams/query-manager/src/main/README.txt b/extras/rya.streams/query-manager/src/main/README.txt
index 3b2dbfe..93d5ac5 100644
--- a/extras/rya.streams/query-manager/src/main/README.txt
+++ b/extras/rya.streams/query-manager/src/main/README.txt
@@ -41,8 +41,11 @@
yum install -y ${project.artifactId}-${project.version}.noarch.rpm
3. Update the configuration file:
- Replace "[Kafka Broker Hostname]" with the IP address of the Kafka broker.
- Replace the Kafka port if using something other than the default of 9092.
+ A. Replace "[Zookeepers used to manage Kafka. E.g.: zoo1,zoo2,zoo3]" with
+ the zookeepers used to manage the Kafka cluster. It is a comma separated
+ list.
+ B. Replace "[Kafka Broker Hostname]" with the IP address of the Kafka broker.
+ C. Replace the Kafka port if using something other than the default of 9092.
4. Start the service:
systemctl start rya-streams-query-manager.service
diff --git a/extras/rya.streams/query-manager/src/main/config/configuration.xml b/extras/rya.streams/query-manager/src/main/config/configuration.xml
index 96da501..7077125 100644
--- a/extras/rya.streams/query-manager/src/main/config/configuration.xml
+++ b/extras/rya.streams/query-manager/src/main/config/configuration.xml
@@ -18,7 +18,7 @@
under the License.
-->
<queryManagerConfig>
- <!-- The Query Change Log Sources. The source defines a system where Rya
+ <!-- The Query Change Log Sources. The source defines a system where Rya
- Streams Query Change Logs are managed. The query manager will manage
- queries for all Rya instances whose change logs are stored within the
- source.
@@ -29,6 +29,14 @@
<port>9092</port>
</kafka>
</queryChangeLogSource>
+
+ <!-- The Query Executor. The executor defines a system for executing the
+ Rya Streams queries. -->
+ <queryExecutor>
+ <localKafkaStreams>
+ <zookeepers>[Zookeepers used to manage Kafka. E.g.: zoo1,zoo2,zoo3]</zookeepers>
+ </localKafkaStreams>
+ </queryExecutor>
<!-- This section defines performance related tuning values. Sensible
- default have been provided to simplify configuration.
diff --git a/extras/rya.streams/query-manager/src/main/config/log4j.xml b/extras/rya.streams/query-manager/src/main/config/log4j.xml
index 2021638..96ea56c 100644
--- a/extras/rya.streams/query-manager/src/main/config/log4j.xml
+++ b/extras/rya.streams/query-manager/src/main/config/log4j.xml
@@ -28,6 +28,23 @@
</layout>
</appender>
+ <!-- Kafka configuration configs are loud. -->
+ <logger name="org.apache.kafka.streams.StreamsConfig">
+ <level value="OFF"/>
+ </logger>
+ <logger name="org.apache.kafka.clients.consumer.ConsumerConfig">
+ <level value="OFF"/>
+ </logger>
+ <logger name="org.apache.kafka.clients.producer.ProducerConfig">
+ <level value="OFF"/>
+ </logger>
+
+ <!-- Change this level to DEBUG to see more information about what the
+ QueryManager is doing. -->
+ <logger name="org.apache.rya.streams.querymanager.QueryManager">
+ <level value="INFO"/>
+ </logger>
+
<root>
<level value="INFO" />
<appender-ref ref="console" />
diff --git a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryChangeLogSource.java b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryChangeLogSource.java
index 73e5d12..eb5ca08 100644
--- a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryChangeLogSource.java
+++ b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryChangeLogSource.java
@@ -51,8 +51,9 @@
public void unsubscribe(final SourceListener listener);
/**
- * A listener that is notified when a {@link QueryChangeLog} has
- * been added or removed from a {@link QueryChangeLogSource}.
+ * A listener that is notified when a {@link QueryChangeLog} has been added or
+ * removed from a {@link QueryChangeLogSource}. The listener receives the only
+ * copy of the change log and is responsible for shutting it down.
*/
@DefaultAnnotation(NonNull.class)
public interface SourceListener {
diff --git a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java
index 30b4538..e6bd800 100644
--- a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java
+++ b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java
@@ -1,18 +1,20 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
package org.apache.rya.streams.querymanager;
@@ -20,16 +22,23 @@
import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.rya.streams.api.entity.StreamsQuery;
import org.apache.rya.streams.api.queries.ChangeLogEntry;
import org.apache.rya.streams.api.queries.InMemoryQueryRepository;
import org.apache.rya.streams.api.queries.QueryChange;
-import org.apache.rya.streams.api.queries.QueryChange.ChangeType;
import org.apache.rya.streams.api.queries.QueryChangeLog;
import org.apache.rya.streams.api.queries.QueryChangeLogListener;
import org.apache.rya.streams.api.queries.QueryRepository;
@@ -38,8 +47,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.util.concurrent.AbstractIdleService;
+import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
+import com.google.common.util.concurrent.AbstractService;
+import com.google.common.util.concurrent.UncheckedExecutionException;
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
@@ -51,205 +62,823 @@
* instances/rya streams instances.
*/
@DefaultAnnotation(NonNull.class)
-public class QueryManager extends AbstractIdleService {
- private static final Logger LOG = LoggerFactory.getLogger(QueryManager.class);
-
- private final QueryExecutor queryExecutor;
- private final Scheduler scheduler;
+public class QueryManager extends AbstractService {
+ private static final Logger log = LoggerFactory.getLogger(QueryManager.class);
/**
- * Map of Rya Instance name to {@link QueryRepository}.
+ * The source of {@link QueryChangeLog}s. Each log discovered is bound to a specific
+ * Rya instnace.
*/
- private final Map<String, QueryRepository> queryRepos = new HashMap<>();
+ private final QueryChangeLogSource changeLogSource;
- private final ReentrantLock lock = new ReentrantLock();
+ /**
+ * The engine that is responsible for executing {@link StreamsQuery}s.
+ */
+ private final QueryExecutor queryExecutor;
- private final QueryChangeLogSource source;
+ /**
+ * How long blocking operations will be attempted before potentially trying again.
+ */
+ private final long blockingValue;
+
+ /**
+ * The units for {@link #blockingValue}.
+ */
+ private final TimeUnit blockingUnits;
+
+ /**
+ * Used to inform threads that the application is shutting down, so they must stop work.
+ */
+ private final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+ /**
+ * This thread pool manages the two thread used to work the {@link LogEvent}s
+ * and the {@link QueryEvent}s.
+ */
+ private final ExecutorService executor = Executors.newFixedThreadPool(2);
/**
* Creates a new {@link QueryManager}.
*
* @param queryExecutor - Runs the active {@link StreamsQuery}s. (not null)
* @param source - The {@link QueryChangeLogSource} of QueryChangeLogs. (not null)
- * @param scheduler - The {@link Scheduler} used to discover query changes
- * within the {@link QueryChangeLog}s (not null)
+ * @param blockingValue - How long blocking operations will try before looping. (> 0)
+ * @param blockingUnits - The units of the {@code blockingValue}. (not null)
*/
- public QueryManager(final QueryExecutor queryExecutor, final QueryChangeLogSource source, final Scheduler scheduler) {
- this.source = requireNonNull(source);
+ public QueryManager(
+ final QueryExecutor queryExecutor,
+ final QueryChangeLogSource source,
+ final long blockingValue,
+ final TimeUnit blockingUnits) {
+ this.changeLogSource = requireNonNull(source);
this.queryExecutor = requireNonNull(queryExecutor);
- this.scheduler = requireNonNull(scheduler);
- }
-
- /**
- * Starts running a query.
- *
- * @param ryaInstanceName - The Rya instance the query belongs to. (not null)
- * @param query - The query to run.(not null)
- */
- private void runQuery(final String ryaInstanceName, final StreamsQuery query) {
- requireNonNull(ryaInstanceName);
- requireNonNull(query);
- LOG.info("Starting Query: " + query.toString() + " on the rya instance: " + ryaInstanceName);
-
- try {
- queryExecutor.startQuery(ryaInstanceName, query);
- } catch (final QueryExecutorException e) {
- LOG.error("Failed to start query.", e);
- }
- }
-
- /**
- * Stops the specified query from running.
- *
- * @param queryId - The ID of the query to stop running. (not null)
- */
- private void stopQuery(final UUID queryId) {
- requireNonNull(queryId);
-
- LOG.info("Stopping query: " + queryId.toString());
-
- try {
- queryExecutor.stopQuery(queryId);
- } catch (final QueryExecutorException e) {
- LOG.error("Failed to stop query.", e);
- }
+ Preconditions.checkArgument(blockingValue > 0, "The blocking value must be > 0. Was: " + blockingValue);
+ this.blockingValue = blockingValue;
+ this.blockingUnits = requireNonNull(blockingUnits);
}
@Override
- protected void startUp() throws Exception {
- lock.lock();
+ protected void doStart() {
+ log.info("Starting a QueryManager.");
+
+ // A work queue of discovered Query Change Logs that need to be handled.
+ // This queue exists so that the source notifying thread may be released
+ // immediately instead of calling into blocking functions.
+ final BlockingQueue<LogEvent> logEvents = new ArrayBlockingQueue<>(1024);
+
+ // A work queue of discovered Query Changes from the monitored Query Change Logs
+ // that need to be handled. This queue exists so that the Query Repository notifying
+ // thread may be released immediately instead of calling into blocking functions.
+ final BlockingQueue<QueryEvent> queryEvents = new ArrayBlockingQueue<>(1024);
+
try {
- LOG.info("Starting Query Manager.");
+ // Start up a LogEventWorker using the executor service.
+ executor.submit(new LogEventWorker(logEvents, queryEvents, blockingValue, blockingUnits, shutdownSignal));
+
+ // Start up a QueryEvent Worker using the executor service.
+ executor.submit(new QueryEventWorker(queryEvents, queryExecutor, blockingValue, blockingUnits, shutdownSignal));
+
+ // Start up the query execution framework.
queryExecutor.startAndWait();
- source.startAndWait();
- // subscribe to the sources to be notified of changes.
- source.subscribe(new QueryManagerSourceListener());
- } finally {
- lock.unlock();
+ // Startup the source that discovers new Query Change Logs.
+ changeLogSource.startAndWait();
+
+ // Subscribe the source a listener that writes to the LogEventWorker's work queue.
+ changeLogSource.subscribe(new LogEventWorkGenerator(logEvents, blockingValue, blockingUnits, shutdownSignal));
+ } catch(final RejectedExecutionException | UncheckedExecutionException e) {
+ log.error("Could not start up a QueryManager.", e);
+ notifyFailed(e);
}
+
+ // Notify the service was successfully started.
+ notifyStarted();
+
+ log.info("QueryManager has finished starting.");
}
@Override
- protected void shutDown() throws Exception {
- lock.lock();
+ protected void doStop() {
+ log.info("Stopping a QueryManager.");
+
+ // Set the shutdown flag so that all components that rely on that signal will stop processing.
+ shutdownSignal.set(true);
+
+ // Stop the workers and wait for them to die.
+ executor.shutdownNow();
try {
- LOG.info("Stopping Query Manager.");
- source.stopAndWait();
- queryExecutor.stopAndWait();
- } finally {
- lock.unlock();
+ if(!executor.awaitTermination(10, TimeUnit.SECONDS)) {
+ log.warn("Waited 10 seconds for the worker threads to die, but they are still running.");
+ }
+ } catch (final InterruptedException e) {
+ log.warn("Waited 10 seconds for the worker threads to die, but they are still running.");
}
+
+ // Stop the source of new Change Logs.
+ try {
+ changeLogSource.stopAndWait();
+ } catch(final UncheckedExecutionException e) {
+ log.warn("Could not stop the Change Log Source.", e);
+ }
+
+ // Stop the query execution framework.
+ try {
+ queryExecutor.stopAndWait();
+ } catch(final UncheckedExecutionException e) {
+ log.warn("Could not stop the Query Executor", e);
+ }
+
+ // Notify the service was successfully stopped.
+ notifyStopped();
+
+ log.info("QueryManager has finished stopping.");
}
/**
- * An implementation of {@link QueryChangeLogListener} for the
- * {@link QueryManager}.
- * <p>
- * When notified of a {@link ChangeType} performs one of the following:
- * <li>{@link ChangeType#CREATE}: Creates a new query using the
- * {@link QueryExecutor} provided to the {@link QueryManager}</li>
- * <li>{@link ChangeType#DELETE}: Deletes a running query by stopping the
- * {@link QueryExecutor} service of the queryID in the event</li>
- * <li>{@link ChangeType#UPDATE}: If the query is running and the update is
- * to stop the query, stops the query. Otherwise, if the query is not
- * running, it is removed.</li>
+ * Offer a unit of work to a blocking queue until it is either accepted, or the
+ * shutdown signal is set.
+ *
+ * @param workQueue - The blocking work queue to write to. (not null)
+ * @param event - The event that will be offered to the work queue. (not null)
+ * @param offerValue - How long to wait when offering new work.
+ * @param offerUnits - The unit for the {@code offerValue}. (not null)
+ * @param shutdownSignal - Used to signal application shutdown has started, so
+ * this method may terminate without ever placing the event on the queue. (not null)
+ * @return {@code true} if the evet nwas added to the queue, otherwise false.
*/
- private class QueryExecutionForwardingListener implements QueryChangeLogListener {
- private final String ryaInstanceName;
+ private static <T> boolean offerUntilAcceptedOrShutdown(
+ final BlockingQueue<T> workQueue,
+ final T event,
+ final long offerValue,
+ final TimeUnit offerUnits,
+ final AtomicBoolean shutdownSignal) {
+ requireNonNull(workQueue);
+ requireNonNull(event);
+ requireNonNull(shutdownSignal);
+
+ boolean submitted = false;
+ while(!submitted && !shutdownSignal.get()) {
+ try {
+ submitted = workQueue.offer(event, offerValue, offerUnits);
+ if(!submitted) {
+ log.debug("An event could not be added to a work queue after waiting 5 seconds. Trying again...");
+ }
+ } catch (final InterruptedException e) {
+ log.debug("An event could not be added to a work queue after waiting 5 seconds. Trying again...");
+ }
+ }
+ return submitted;
+ }
+
+ /**
+ * An observation that a {@link QueryChangeLog} was created within or
+ * removed from a {@link QueryChangeLogSource}.
+ */
+ @DefaultAnnotation(NonNull.class)
+ static class LogEvent {
/**
- * Creates a new {@link QueryExecutionForwardingListener}.
- *
- * @param ryaInstanceName - The rya instance the query change is
- * performed on. (not null)
+ * The types of events that may be observed.
*/
- public QueryExecutionForwardingListener(final String ryaInstanceName) {
- this.ryaInstanceName = requireNonNull(ryaInstanceName);
+ static enum LogEventType {
+ /**
+ * A {@link QueryChangeLog} was created within a {@link QueryChangeLogSource}.
+ */
+ CREATE,
+
+ /**
+ * A {@link QueryChangeLog} was deleted from a {@link QueryChangeLogSource}.
+ */
+ DELETE;
+ }
+
+ private final String ryaInstance;
+ private final LogEventType eventType;
+ private final Optional<QueryChangeLog> log;
+
+ /**
+ * Constructs an instance of {@link LogEvent}.
+ *
+ * @param ryaInstance - The Rya Instance the log is/was for. (not null)
+ * @param eventType - The type of event that was observed. (not null)
+ * @param log - The log if this is a create event. (not null)
+ */
+ private LogEvent(final String ryaInstance, final LogEventType eventType, final Optional<QueryChangeLog> log) {
+ this.ryaInstance = requireNonNull(ryaInstance);
+ this.eventType = requireNonNull(eventType);
+ this.log = requireNonNull(log);
+ }
+
+ /**
+ * @return The Rya Instance whose log was either created or deleted.
+ */
+ public String getRyaInstanceName() {
+ return ryaInstance;
+ }
+
+ /**
+ * @return The type of event that was observed.
+ */
+ public LogEventType getEventType() {
+ return eventType;
+ }
+
+ /**
+ * @return The {@link QueryChangeLog} if this is a CREATE event.
+ */
+ public Optional<QueryChangeLog> getQueryChangeLog() {
+ return log;
}
@Override
- public void notify(final ChangeLogEntry<QueryChange> queryChangeEvent, final Optional<StreamsQuery> newQueryState) {
- LOG.debug("New query change event.");
- final QueryChange entry = queryChangeEvent.getEntry();
+ public String toString() {
+ return "LogEvent {\n" +
+ " Rya Instance: " + ryaInstance + ",\n" +
+ " Event Type: " + eventType + "\n" +
+ "}";
+ }
- lock.lock();
- try {
+ /**
+ * Make a {@link LogEvent} that indicates a {@link QueryChangeLog} was created within a
+ * {@link QueryChangeLogSource}.
+ *
+ * @param ryaInstance - The Rya Instance the created log is for. (not null)
+ * @param log - The created {@link QueryChangeLog. (not null)
+ * @return A {@link LogEvent} built using the provided values.
+ */
+ public static LogEvent create(final String ryaInstance, final QueryChangeLog log) {
+ return new LogEvent(ryaInstance, LogEventType.CREATE, Optional.of(log));
+ }
- switch (entry.getChangeType()) {
- case CREATE:
- if(!newQueryState.isPresent()) {
- LOG.error("The query with ID: " + entry.getQueryId() + " must be present with the change to be created.");
- LOG.debug("newQueryState is not allowed to be absent with a CREATE QueryChange, there might be a bug in the QueryRepository.");
- } else {
- runQuery(ryaInstanceName, newQueryState.get());
- }
- break;
- case DELETE:
- stopQuery(entry.getQueryId());
- break;
- case UPDATE:
- if (!newQueryState.isPresent()) {
- LOG.error("The query with ID: " + entry.getQueryId() + " must be provided with the update, cannot perform update.");
- LOG.debug("newQueryState is not allowed to be absent with a UPDATE QueryChange, there might be a bug in the QueryRepository.");
- } else {
- final StreamsQuery updatedQuery = newQueryState.get();
- if (updatedQuery.isActive()) {
- runQuery(ryaInstanceName, updatedQuery);
- LOG.info("Starting query: " + updatedQuery.toString());
- } else {
- stopQuery(updatedQuery.getQueryId());
- LOG.info("Stopping query: " + updatedQuery.toString());
- }
- }
- break;
- }
- } finally {
- lock.unlock();
- }
+ /**
+ * Make a {@link LogEvent} that indicates a {@link QueryChangeLog} was deleted from
+ * a {@link QueryChangeLogSource}.
+ *
+ * @param ryaInstance - The Rya Instance whose log was deleted. (not null)
+ * @return A {@link LogEvent} built using the provided values.
+ */
+ public static LogEvent delete(final String ryaInstance) {
+ return new LogEvent(ryaInstance, LogEventType.DELETE, Optional.empty());
}
}
/**
- * Listener used by the {@link QueryManager} to be notified when
- * {@link QueryChangeLog}s are created or deleted.
+ * An observation that a {@link StreamsQuery} needs to be executing or not
+ * via the provided {@link QueryExecutor}.
*/
- private class QueryManagerSourceListener implements SourceListener {
+ @DefaultAnnotation(NonNull.class)
+ static class QueryEvent {
+
+ /**
+ * The type of events that may be observed.
+ */
+ public static enum QueryEventType {
+ /**
+ * Indicates a {@link StreamsQuery} needs to be executing.
+ */
+ EXECUTING,
+
+ /**
+ * Indicates a {@link StreamsQuery} needs to be stopped.
+ */
+ STOPPED,
+
+ /**
+ * Indicates all {@link StreamsQuery}s for a Rya instance need to be stopped.
+ */
+ STOP_ALL;
+ }
+
+ private final String ryaInstance;
+ private final QueryEventType type;
+ private final Optional<UUID> queryId;
+ private final Optional<StreamsQuery> query;
+
+ /**
+ * Constructs an instance of {@link QueryEvent}.
+ *
+ * @param ryaInstance - The Rya instance that generated the event. (not null)
+ * @param type - Indicates whether the query needs to be executing or not. (not null)
+ * @param queryId - If stopped, the ID of the query that must not be running. (not null)
+ * @param query - If executing, the StreamsQuery that defines what should be executing. (not null)
+ */
+ private QueryEvent(
+ final String ryaInstance,
+ final QueryEventType type,
+ final Optional<UUID> queryId,
+ final Optional<StreamsQuery> query) {
+ this.ryaInstance = requireNonNull(ryaInstance);
+ this.type = requireNonNull(type);
+ this.queryId = requireNonNull(queryId);
+ this.query = requireNonNull(query);
+ }
+
+ /**
+ * @return The Rya instance that generated the event.
+ */
+ public String getRyaInstance() {
+ return ryaInstance;
+ }
+
+ /**
+ * @return Indicates whether the query needs to be executing or not.
+ */
+ public QueryEventType getType() {
+ return type;
+ }
+
+ /**
+ * @return If stopped, the ID of the query that must not be running. Otherwise absent.
+ */
+ public Optional<UUID> getQueryId() {
+ return queryId;
+ }
+
+ /**
+ * @return If executing, the StreamsQuery that defines what should be executing. Otherwise absent.
+ */
+ public Optional<StreamsQuery> getStreamsQuery() {
+ return query;
+ }
+
@Override
- public void notifyCreate(final String ryaInstanceName, final QueryChangeLog log) {
- lock.lock();
- try {
- LOG.info("Discovered new Query Change Log for Rya Instance " + ryaInstanceName + ".");
- final QueryRepository repo = new InMemoryQueryRepository(log, scheduler);
- repo.startAndWait();
- final Set<StreamsQuery> queries = repo.subscribe(new QueryExecutionForwardingListener(ryaInstanceName));
- queries.forEach(query -> {
- if (query.isActive()) {
- try {
- queryExecutor.startQuery(ryaInstanceName, query);
- } catch (IllegalStateException | QueryExecutorException e) {
- LOG.error("Unable to start query for rya instance " + ryaInstanceName, e);
- }
- }
- });
- queryRepos.put(ryaInstanceName, repo);
- } finally {
- lock.unlock();
+ public int hashCode() {
+ return Objects.hash(ryaInstance, type, queryId, query);
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if(o instanceof QueryEvent) {
+ final QueryEvent other = (QueryEvent) o;
+ return Objects.equals(ryaInstance, other.ryaInstance) &&
+ Objects.equals(type, other.type) &&
+ Objects.equals(queryId, other.queryId) &&
+ Objects.equals(query, other.query);
}
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder string = new StringBuilder();
+ string.append("Query Event {\n")
+ .append(" Rya Instance: ").append(ryaInstance).append(",\n")
+ .append(" Type: ").append(type).append(",\n");
+ switch(type) {
+ case EXECUTING:
+ append(string, query.get());
+ break;
+ case STOPPED:
+ string.append(" Query ID: ").append(queryId.get()).append("\n");
+ break;
+ case STOP_ALL:
+ break;
+ default:
+ // Default to showing everything that is in the object.
+ string.append(" Query ID: ").append(queryId.get()).append("\n");
+ append(string, query.get());
+ break;
+ }
+ string.append("}");
+ return string.toString();
+ }
+
+ private void append(final StringBuilder string, final StreamsQuery query) {
+ requireNonNull(string);
+ requireNonNull(query);
+ string.append(" Streams Query {\n")
+ .append(" Query ID: ").append(query.getQueryId()).append(",\n")
+ .append(" Is Active: ").append(query.isActive()).append(",\n")
+ .append(" SPARQL: ").append(query.getSparql()).append("\n")
+ .append(" }");
+ }
+
+ /**
+ * Create a {@link QueryEvent} that indicates a query needs to be executing.
+ *
+ * @param ryaInstance - The Rya instance that generated the event. (not null)
+ * @param query - The StreamsQuery that defines what should be executing. (not null)
+ * @return A {@link QueryEvent} built using the provided values.
+ */
+ public static QueryEvent executing(final String ryaInstance, final StreamsQuery query) {
+ return new QueryEvent(ryaInstance, QueryEventType.EXECUTING, Optional.empty(), Optional.of(query));
+ }
+
+ /**
+ * Create a {@link QueryEvent} that indicates a query needs to be stopped.
+ *
+ * @param ryaInstance - The Rya instance that generated the event. (not null)
+ * @param queryId - The ID of the query that must not be running. (not null)
+ * @return A {@link QueryEvent} built using the provided values.
+ */
+ public static QueryEvent stopped(final String ryaInstance, final UUID queryId) {
+ return new QueryEvent(ryaInstance, QueryEventType.STOPPED, Optional.of(queryId), Optional.empty());
+ }
+
+ /**
+ * Create a {@link QueryEvent} that indicates all queries for a Rya instance needs to be stopped.
+ *
+ * @param ryaInstance - The Rya instance that generated the event. (not null)
+ * @return A {@link QueryEvent} built using the provided values.
+ */
+ public static QueryEvent stopALL(final String ryaInstance) {
+ return new QueryEvent(ryaInstance, QueryEventType.STOP_ALL, Optional.empty(), Optional.empty());
+ }
+ }
+
+ /**
+ * Listens to a {@link QueryChangeLogSource} and adds observations to the provided
+ * work queue. It does so until the provided shutdown signal is set.
+ */
+ @DefaultAnnotation(NonNull.class)
+ static class LogEventWorkGenerator implements SourceListener {
+
+ private final BlockingQueue<LogEvent> workQueue;
+ private final AtomicBoolean shutdownSignal;
+ private final long offerValue;
+ private final TimeUnit offerUnits;
+
+ /**
+ * Constructs an instance of {@link QueryManagerSourceListener}.
+ *
+ * @param workQueue - A blocking queue that will have {@link LogEvent}s offered to it. (not null)
+ * @param offerValue - How long to wait when offering new work.
+ * @param offerUnits - The unit for the {@code offerValue}. (not null)
+ * @param shutdownSignal - Indicates to this listener that it needs to stop adding events
+ * to the work queue because the application is shutting down. (not null)
+ */
+ public LogEventWorkGenerator(
+ final BlockingQueue<LogEvent> workQueue,
+ final long offerValue,
+ final TimeUnit offerUnits,
+ final AtomicBoolean shutdownSignal) {
+ this.workQueue = requireNonNull(workQueue);
+ this.shutdownSignal = requireNonNull(shutdownSignal);
+ this.offerValue = offerValue;
+ this.offerUnits = requireNonNull(offerUnits);
+ }
+
+ @Override
+ public void notifyCreate(final String ryaInstanceName, final QueryChangeLog changeLog) {
+ log.info("A new Query Change Log has been discovered for Rya Instance " + ryaInstanceName + ". All " +
+ "queries that are set to active within it will be started.");
+
+ // Create an event that summarizes this notification.
+ final LogEvent event = LogEvent.create(ryaInstanceName, changeLog);
+
+ // Offer it to the worker until there is room for it in the work queue, or we are shutting down.
+ offerUntilAcceptedOrShutdown(workQueue, event, offerValue, offerUnits, shutdownSignal);
}
@Override
public void notifyDelete(final String ryaInstanceName) {
- lock.lock();
+ log.info("The Query Change Log for Rya Instance " + ryaInstanceName + " has been deleted. All of the " +
+ "queries related to that instance will be stopped.");
+
+ // Create an event that summarizes this notification.
+ final LogEvent event = LogEvent.delete(ryaInstanceName);
+
+ // Offer it to the worker until there is room for it in the work queue, or we are shutting down.
+ offerUntilAcceptedOrShutdown(workQueue, event, offerValue, offerUnits, shutdownSignal);
+ }
+ }
+
+ /**
+ * Processes a work queue of {@link LogEvent}s.
+ * <p/>
+ * Whenever a new log has been created, then it registers a {@link QueryEventWorkGenerator}
+ * that generates {@link QueryEvent}s based on the content and updates to the discovered
+ * {@link QueryChagneLog}.
+ * <p/>
+ * Whenever a log is deleted, then the generator is stopped and a stop all {@link QueryEvent}
+ * is written to the work queue.
+ */
+ @DefaultAnnotation(NonNull.class)
+ static class LogEventWorker implements Runnable {
+
+ /**
+ * A map of Rya Instance name to he Query Repository for that instance.
+ */
+ private final Map<String, QueryRepository> repos = new HashMap<>();
+
+ private final BlockingQueue<LogEvent> logWorkQueue;
+ private final BlockingQueue<QueryEvent> queryWorkQueue;
+ private final long blockingValue;
+ private final TimeUnit blockingUnits;
+ private final AtomicBoolean shutdownSignal;
+
+ /**
+ * Constructs an instance of {@link LogEventWorker}.
+ *
+ * @param logWorkQueue - A queue of {@link LogEvent}s that will be worked by this object. (not null)
+ * @param queryWorkQueue - A queue where {@link QueryEvent}s will be placed by this object. (not null)
+ * @param blockingValue - How long to wait when polling/offering new work.
+ * @param blockingUnits - The unit for the {@code blockingValue}. (not null)
+ * @param shutdownSignal - Indicates when the application has been shutdown, so the executing thread
+ * may exit the {@link #run()} method. (not null)
+ */
+ public LogEventWorker(
+ final BlockingQueue<LogEvent> logWorkQueue,
+ final BlockingQueue<QueryEvent> queryWorkQueue,
+ final long blockingValue,
+ final TimeUnit blockingUnits,
+ final AtomicBoolean shutdownSignal) {
+ this.logWorkQueue = requireNonNull(logWorkQueue);
+ this.queryWorkQueue = requireNonNull(queryWorkQueue);
+ this.blockingValue = blockingValue;
+ this.blockingUnits = requireNonNull(blockingUnits);
+ this.shutdownSignal = requireNonNull(shutdownSignal);
+ }
+
+ @Override
+ public void run() {
+ // Run until the shutdown signal is set.
+ while(!shutdownSignal.get()) {
+ try {
+ // Pull a unit of work from the queue.
+ log.debug("LogEventWorker - Polling the work queue for a new LogEvent.");
+ final LogEvent logEvent = logWorkQueue.poll(blockingValue, blockingUnits);
+ if(logEvent == null) {
+ // Poll again if nothing was found.
+ continue;
+ }
+
+ log.info("LogEventWorker - handling: \n" + logEvent);
+ final String ryaInstance = logEvent.getRyaInstanceName();
+
+ switch(logEvent.getEventType()) {
+ case CREATE:
+ // If we see a create message for a Rya Instance we are already maintaining,
+ // then don't do anything.
+ if(repos.containsKey(ryaInstance)) {
+ log.warn("LogEventWorker - A repository is already being managed for the Rya Instance " +
+ ryaInstance + ". This message will be ignored.");
+ continue;
+ }
+
+ // Create and start a QueryRepository for the discovered log. Hold onto the repository
+ // so that it may be shutdown later.
+ final Scheduler scheduler = Scheduler.newFixedRateSchedule(0, blockingValue, blockingUnits);
+ final QueryRepository repo = new InMemoryQueryRepository(logEvent.getQueryChangeLog().get(), scheduler);
+ repo.startAndWait();
+ repos.put(ryaInstance, repo);
+
+ // Subscribe a worker that adds the Query Events to the queryWorkQueue queue.
+ // A count down latch is used to ensure the returned set of queries are handled
+ // prior to any notifications from the repository.
+ final CountDownLatch subscriptionWorkFinished = new CountDownLatch(1);
+ final QueryEventWorkGenerator queryWorkGenerator =
+ new QueryEventWorkGenerator(ryaInstance, subscriptionWorkFinished, queryWorkQueue,
+ blockingValue, blockingUnits, shutdownSignal);
+
+ log.debug("LogEventWorker - Setting up a QueryWorkGenerator...");
+ final Set<StreamsQuery> queries = repo.subscribe(queryWorkGenerator);
+ log.debug("LogEventWorker - Finished setting up a QueryWorkGenerator.");
+
+ // Handle the view of the queries within the repository as it existed when
+ // the subscription was registered.
+ queries.stream()
+ .forEach(query -> {
+ // Create a QueryEvent that represents the active state of the existing query.
+ final QueryEvent queryEvent = query.isActive() ?
+ QueryEvent.executing(ryaInstance, query) : QueryEvent.stopped(ryaInstance, query.getQueryId());
+ log.debug("LogEventWorker - offering: " + queryEvent);
+
+ // Offer it to the worker until there is room for it in the work queue, or we are shutting down.
+ offerUntilAcceptedOrShutdown(queryWorkQueue, queryEvent, blockingValue, blockingUnits, shutdownSignal);
+ });
+
+ // Indicate the subscription work is finished so that the registered listener may start
+ // adding work to the queue.
+ log.info("LogEventWorker - Counting down the subscription work latch.");
+ subscriptionWorkFinished.countDown();
+ break;
+
+ case DELETE:
+ if(repos.containsKey(ryaInstance)) {
+ // Shut down the query repository for the Rya instance. This ensures the listener will
+ // not receive any more work that needs to be done.
+ final QueryRepository deletedRepo = repos.remove(ryaInstance);
+ deletedRepo.stopAndWait();
+
+ // Add work that stops all of the queries related to the instance.
+ final QueryEvent stopAllEvent = QueryEvent.stopALL(ryaInstance);
+ offerUntilAcceptedOrShutdown(queryWorkQueue, stopAllEvent, blockingValue, blockingUnits, shutdownSignal);
+ }
+ break;
+ }
+ } catch (final InterruptedException e) {
+ log.debug("LogEventWorker did not see any new events over the past 5 seconds. Polling again...");
+ }
+ }
+
+ log.info("LogEventWorker shutting down...");
+
+ // Shutdown all of the QueryRepositories that were started.
+ repos.values().forEach(repo -> repo.stopAndWait());
+
+ log.info("LogEventWorker shut down.");
+ }
+ }
+
+ /**
+ * Listens to a {@link QueryRepository} and adds observations to the provided work queue.
+ * It does so until the provided shutdown signal is set.
+ */
+ @DefaultAnnotation(NonNull.class)
+ static class QueryEventWorkGenerator implements QueryChangeLogListener {
+
+ private final String ryaInstance;
+ private final CountDownLatch subscriptionWorkFinished;
+ private final BlockingQueue<QueryEvent> queryWorkQueue;
+ private final long blockingValue;
+ private final TimeUnit blockingUnits;
+ private final AtomicBoolean shutdownSignal;
+
+ /**
+ * Constructs an instance of {@link QueryEventWorkGenerator}.
+ *
+ * @param ryaInstance - The rya instance whose log this objects is watching. (not null)
+ * @param subscriptionWorkFinished - Indicates when work that needs to be completed before this
+ * listener handles notifications is completed. (not null)
+ * @param queryWorkQueue - A queue where {@link QueryEvent}s will be placed by this object. (not null)
+ * @param blockingValue - How long to wait when polling/offering new work.
+ * @param blockingUnits - The unit for the {@code blockingValue}. (not null)
+ * @param shutdownSignal - Indicates to this listener that it needs to stop adding events
+ * to the work queue because the application is shutting down. (not null)
+ */
+ public QueryEventWorkGenerator(
+ final String ryaInstance,
+ final CountDownLatch subscriptionWorkFinished,
+ final BlockingQueue<QueryEvent> queryWorkQueue,
+ final long blockingValue,
+ final TimeUnit blockingUnits,
+ final AtomicBoolean shutdownSignal) {
+ this.ryaInstance = requireNonNull(ryaInstance);
+ this.subscriptionWorkFinished = requireNonNull(subscriptionWorkFinished);
+ this.queryWorkQueue = requireNonNull(queryWorkQueue);
+ this.blockingValue = blockingValue;
+ this.blockingUnits = requireNonNull(blockingUnits);
+ this.shutdownSignal = requireNonNull(shutdownSignal);
+ }
+
+ @Override
+ public void notify(final ChangeLogEntry<QueryChange> queryChangeEvent, final Optional<StreamsQuery> newQueryState) {
+ requireNonNull(queryChangeEvent);
+ requireNonNull(newQueryState);
+
+ // Wait for the subscription work to be finished.
try {
- LOG.info("Notified of deleting QueryChangeLog, stopping all queries belonging to the change log for "
- + ryaInstanceName + ".");
- queryExecutor.stopAll(ryaInstanceName);
- } catch (final QueryExecutorException e) {
- LOG.error("Failed to stop all queries belonging to: " + ryaInstanceName, e);
- } finally {
- lock.unlock();
+ log.debug("Waiting for Subscription Work Finished latch to release...");
+ while(!shutdownSignal.get() && !subscriptionWorkFinished.await(blockingValue, blockingUnits)) {
+ log.debug("Still waiting...");
+ }
+ log.debug("Subscription Work Finished latch to released.");
+ } catch (final InterruptedException e) {
+ log.warn("Interrupted while waiting for the Subscription Work Finished latch to be " +
+ "released. Shutting down?", e);
+ }
+
+ // If we left the loop because of a shutdown, return immediately.
+ if(shutdownSignal.get()) {
+ log.debug("Not processing notification. Shutting down.");
+ return;
+ }
+
+ // Generate work from the notification.
+ final QueryChange change = queryChangeEvent.getEntry();
+ switch(change.getChangeType()) {
+ case CREATE:
+ if(newQueryState.isPresent()) {
+ log.info("Rya Instance " + ryaInstance + " created Rya Streams query " + newQueryState + ".");
+ final StreamsQuery newQuery = newQueryState.get();
+ if(newQuery.isActive()) {
+ final QueryEvent executeNewQuery = QueryEvent.executing(ryaInstance, newQuery);
+ offerUntilAcceptedOrShutdown(queryWorkQueue, executeNewQuery, blockingValue, blockingUnits, shutdownSignal);
+ }
+ } else {
+ log.error("Received a CREATE QueryChange for Rya Instance: " + ryaInstance +
+ ", Query ID: " + change.getQueryId() + ", but the QueryRepository did not supply a " +
+ "StreamsQuery representing the created query. The query will not be processed.");
+ }
+ break;
+
+ case DELETE:
+ final UUID deletedQueryId = change.getQueryId();
+ log.info("Rya Instance " + ryaInstance + " deleted Rya Streams query with ID " + deletedQueryId);
+ final QueryEvent stopDeletedQuery = QueryEvent.stopped(ryaInstance, deletedQueryId);
+ offerUntilAcceptedOrShutdown(queryWorkQueue, stopDeletedQuery, blockingValue, blockingUnits, shutdownSignal);
+ break;
+
+ case UPDATE:
+ if(newQueryState.isPresent()) {
+ final StreamsQuery updatedQuery = newQueryState.get();
+ if(updatedQuery.isActive()) {
+ log.info("Rya Instance " + ryaInstance + " updated Rya Streams query with ID " +
+ updatedQuery.getQueryId() + " to be active.");
+ final QueryEvent executeUpdatedQuery = QueryEvent.executing(ryaInstance, updatedQuery);
+ offerUntilAcceptedOrShutdown(queryWorkQueue, executeUpdatedQuery, blockingValue, blockingUnits, shutdownSignal);
+ } else {
+ log.info("Rya Instance " + ryaInstance + " updated Rya Streams query with ID " +
+ updatedQuery.getQueryId() + " to be inactive.");
+ final QueryEvent stopUpdatedQuery = QueryEvent.stopped(ryaInstance, updatedQuery.getQueryId());
+ offerUntilAcceptedOrShutdown(queryWorkQueue, stopUpdatedQuery, blockingValue, blockingUnits, shutdownSignal);
+ }
+ } else {
+ log.error("Received an UPDATE QueryChange for Rya Instance: " + ryaInstance +
+ ", Query ID: " + change.getQueryId() + ", but the QueryRepository did not supply a " +
+ "StreamsQuery representing the created query. The query will not be processed.");
+ }
+ break;
}
}
}
-}
+
+ /**
+ * Processes a work queue of {@link QueryEvent}s.
+ * <p/>
+ * Each type of event maps the to corresponding method on {@link QueryExecutor} that is called into.
+ */
+ @DefaultAnnotation(NonNull.class)
+ static class QueryEventWorker implements Runnable {
+
+ private final BlockingQueue<QueryEvent> workQueue;
+ private final QueryExecutor queryExecutor;
+ private final long pollingValue;
+ private final TimeUnit pollingUnits;
+ private final AtomicBoolean shutdownSignal;
+
+ /**
+ * Constructs an instance of {@link QueryEventWorker}.
+ *
+ * @param workQueue - A queue of {@link QueryEvent}s that will be worked by this object. (not null)
+ * @param queryExecutor - Responsible for executing the {@link StreamsQuery}s. (not null)
+ * @param pollingValue - How long to wait when polling for new work.
+ * @param pollingUnits - The units for the {@code pollingValue}. (not null)
+ * @param shutdownSignal - Indicates when the application has been shutdown, so the executing thread
+ * may exit the {@link #run()} method. (not null)
+ */
+ public QueryEventWorker(
+ final BlockingQueue<QueryEvent> workQueue,
+ final QueryExecutor queryExecutor,
+ final long pollingValue,
+ final TimeUnit pollingUnits,
+ final AtomicBoolean shutdownSignal) {
+ this.workQueue = requireNonNull(workQueue);
+ this.queryExecutor = requireNonNull(queryExecutor);
+ this.pollingValue = pollingValue;
+ this.pollingUnits = requireNonNull(pollingUnits);
+ this.shutdownSignal = requireNonNull(shutdownSignal);
+ }
+
+ @Override
+ public void run() {
+ log.info("QueryEventWorker starting.");
+
+ // Run until the shutdown signal is set.
+ while(!shutdownSignal.get()) {
+ // Pull a unit of work from the queue.
+ try {
+ log.debug("Polling the work queue for a new QueryEvent.");
+ final QueryEvent event = workQueue.poll(pollingValue, pollingUnits);
+ if(event == null) {
+ // Poll again if nothing was found.
+ continue;
+ }
+
+ log.info("QueryEventWorker handling:\n" + event);
+
+ // Ensure the state within the executor matches the query event's state.
+ switch(event.getType()) {
+ case EXECUTING:
+ try {
+ queryExecutor.startQuery(event.getRyaInstance(), event.getStreamsQuery().get());
+ } catch (final IllegalStateException | QueryExecutorException e) {
+ log.error("Could not start a query represented by the following work: " + event, e);
+ }
+ break;
+
+ case STOPPED:
+ try {
+ queryExecutor.stopQuery(event.getQueryId().get());
+ } catch (final IllegalStateException | QueryExecutorException e) {
+ log.error("Could not stop a query represented by the following work: " + event, e);
+ }
+ break;
+
+ case STOP_ALL:
+ try {
+ queryExecutor.stopAll(event.getRyaInstance());
+ } catch (final IllegalStateException | QueryExecutorException e) {
+ log.error("Could not stop all queries represented by the following work: " + event, e);
+ }
+ break;
+ }
+ } catch (final InterruptedException e) {
+ log.debug("QueryEventWorker interrupted. Probably shutting down.");
+ }
+ }
+ log.info("QueryEventWorker shut down.");
+ }
+ }
+}
\ No newline at end of file
diff --git a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManagerDaemon.java b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManagerDaemon.java
index 515d699..04a0382 100644
--- a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManagerDaemon.java
+++ b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManagerDaemon.java
@@ -33,6 +33,7 @@
import org.apache.commons.daemon.DaemonInitException;
import org.apache.rya.streams.kafka.KafkaStreamsFactory;
import org.apache.rya.streams.kafka.SingleThreadKafkaStreamsFactory;
+import org.apache.rya.streams.kafka.interactor.CreateKafkaTopic;
import org.apache.rya.streams.querymanager.kafka.KafkaQueryChangeLogSource;
import org.apache.rya.streams.querymanager.kafka.LocalQueryExecutor;
import org.apache.rya.streams.querymanager.xml.Kafka;
@@ -91,7 +92,7 @@
// Unmarshall the configuration file into an object.
final QueryManagerConfig config;
- try(InputStream stream = Files.newInputStream(configFile)) {
+ try(final InputStream stream = Files.newInputStream(configFile)) {
config = QueryManagerConfigUnmarshaller.unmarshall(stream);
} catch(final JAXBException | SAXException e) {
throw new DaemonInitException("Unable to marshall the configuration XML file: " + configFile, e);
@@ -110,11 +111,12 @@
final QueryChangeLogSource source = new KafkaQueryChangeLogSource(kafka.getHostname(), kafka.getPort(), scheduler);
// Initialize a QueryExecutor.
+ final String zookeeperServers = config.getQueryExecutor().getLocalKafkaStreams().getZookeepers();
final KafkaStreamsFactory streamsFactory = new SingleThreadKafkaStreamsFactory(kafka.getHostname() + ":" + kafka.getPort());
- final QueryExecutor queryExecutor = new LocalQueryExecutor(streamsFactory);
+ final QueryExecutor queryExecutor = new LocalQueryExecutor(new CreateKafkaTopic(zookeeperServers), streamsFactory);
// Initialize the QueryManager using the configured resources.
- manager = new QueryManager(queryExecutor, source, scheduler);
+ manager = new QueryManager(queryExecutor, source, period, units);
}
@Override
diff --git a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSource.java b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSource.java
index 32305f5..e746baf 100644
--- a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSource.java
+++ b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSource.java
@@ -33,11 +33,12 @@
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.rya.streams.api.queries.QueryChangeLog;
import org.apache.rya.streams.kafka.KafkaTopics;
import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog;
import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLogFactory;
import org.apache.rya.streams.querymanager.QueryChangeLogSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.AbstractScheduledService;
@@ -53,6 +54,8 @@
@DefaultAnnotation(NonNull.class)
public class KafkaQueryChangeLogSource extends AbstractScheduledService implements QueryChangeLogSource {
+ private static final Logger log = LoggerFactory.getLogger(KafkaQueryChangeLogSource.class);
+
/**
* Ensures thread safe interactions with this object.
*/
@@ -74,10 +77,10 @@
private final Set<SourceListener> listeners = new HashSet<>();
/**
- * Maps Rya instance name to a Query Change Log for that instance. This map is used to keep
- * track of how the change logs change over time within the Kafka Server.
+ * Maps Rya instance names to the Query Change Log topic name in Kafka. This map is used to
+ * keep track of how the change logs change over time within the Kafka Server.
*/
- private final HashMap<String, QueryChangeLog> knownChangeLogs = new HashMap<>();
+ private final HashMap<String, String> knownChangeLogs = new HashMap<>();
/**
* A consumer that is used to poll the Kafka Server for topics.
@@ -101,6 +104,8 @@
@Override
protected void startUp() throws Exception {
+ log.info("Kafka Query Change Log Source watching " + kafkaBootstrapServer + " starting up...");
+
// Setup the consumer that is used to list topics for the source.
final Properties consumerProperties = new Properties();
consumerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServer);
@@ -108,17 +113,23 @@
consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
listTopicsConsumer = new KafkaConsumer<>(consumerProperties);
+
+ log.info("Kafka Query Change Log Source watching " + kafkaBootstrapServer + " started.");
}
@Override
protected void shutDown() throws Exception {
- // Shut down the consumer that's used to list topics.
- listTopicsConsumer.close();
+ log.info("Kafka Query Change Log Source watching " + kafkaBootstrapServer + " shutting down...");
- // Shut down all of the change logs that were created within this class.
- for(final QueryChangeLog changeLog : knownChangeLogs.values()) {
- changeLog.close();
+ lock.lock();
+ try {
+ // Shut down the consumer that's used to list topics.
+ listTopicsConsumer.close();
+ } finally {
+ lock.unlock();
}
+
+ log.info("Kafka Query Change Log Source watching " + kafkaBootstrapServer + " shut down.");
}
@Override
@@ -130,8 +141,10 @@
listeners.add(listener);
// Notify it with everything that already exists.
- for(final Entry<String, QueryChangeLog> entry : knownChangeLogs.entrySet()) {
- listener.notifyCreate(entry.getKey(), entry.getValue());
+ for(final Entry<String, String> entry : knownChangeLogs.entrySet()) {
+ final String changeLogTopic = entry.getValue();
+ final KafkaQueryChangeLog changeLog = KafkaQueryChangeLogFactory.make(kafkaBootstrapServer, changeLogTopic);
+ listener.notifyCreate(entry.getKey(), changeLog);
}
} finally {
lock.unlock();
@@ -174,26 +187,23 @@
// Handle the deletes.
for(final String deletedRyaInstance : deletedRyaInstances) {
// Remove the change log from the set of known logs.
- final QueryChangeLog removed = knownChangeLogs.remove(deletedRyaInstance);
+ knownChangeLogs.remove(deletedRyaInstance);
- // Notify the listeners of the update.
+ // Notify the listeners of the update so that they may close the previously provided change log.
for(final SourceListener listener : listeners) {
listener.notifyDelete(deletedRyaInstance);
}
-
- // Ensure the change log is closed.
- removed.close();
}
// Handle the adds.
for(final String createdRyaInstance : createdRyaInstances) {
// Create and store the ChangeLog.
final String changeLogTopic = KafkaTopics.queryChangeLogTopic(createdRyaInstance);
- final KafkaQueryChangeLog changeLog = KafkaQueryChangeLogFactory.make(kafkaBootstrapServer, changeLogTopic);
- knownChangeLogs.put(createdRyaInstance, changeLog);
+ knownChangeLogs.put(createdRyaInstance, changeLogTopic);
// Notify the listeners of the update.
for(final SourceListener listener : listeners) {
+ final KafkaQueryChangeLog changeLog = KafkaQueryChangeLogFactory.make(kafkaBootstrapServer, changeLogTopic);
listener.notifyCreate(createdRyaInstance, changeLog);
}
}
diff --git a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutor.java b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutor.java
index 947a215..3a59636 100644
--- a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutor.java
+++ b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutor.java
@@ -32,10 +32,15 @@
import org.apache.rya.streams.api.entity.StreamsQuery;
import org.apache.rya.streams.kafka.KafkaStreamsFactory;
import org.apache.rya.streams.kafka.KafkaStreamsFactory.KafkaStreamsFactoryException;
+import org.apache.rya.streams.kafka.KafkaTopics;
+import org.apache.rya.streams.kafka.interactor.CreateKafkaTopic;
import org.apache.rya.streams.querymanager.QueryExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
import com.google.common.util.concurrent.AbstractIdleService;
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
@@ -50,6 +55,7 @@
*/
@DefaultAnnotation(NonNull.class)
public class LocalQueryExecutor extends AbstractIdleService implements QueryExecutor {
+ private static final Logger log = LoggerFactory.getLogger(LocalQueryExecutor.class);
/**
* Provides thread safety when interacting with this class.
@@ -72,6 +78,11 @@
private final Map<UUID, KafkaStreams> byQueryId = new HashMap<>();
/**
+ * Used to create the input and output topics for a Kafka Streams job.
+ */
+ private final CreateKafkaTopic createKafkaTopic;
+
+ /**
* Builds the {@link KafkaStreams} objects that execute {@link KafkaStream}s.
*/
private final KafkaStreamsFactory streamsFactory;
@@ -79,23 +90,31 @@
/**
* Constructs an instance of {@link LocalQueryExecutor}.
*
+ * @param createKafkaTopic - Used to create the input and output topics for a Kafka Streams job. (not null)
* @param streamsFactory - Builds the {@link KafkaStreams} objects that execute {@link KafkaStream}s. (not null)
*/
- public LocalQueryExecutor(final KafkaStreamsFactory streamsFactory) {
+ public LocalQueryExecutor(
+ final CreateKafkaTopic createKafkaTopic,
+ final KafkaStreamsFactory streamsFactory) {
+ this.createKafkaTopic = requireNonNull(createKafkaTopic);
this.streamsFactory = requireNonNull(streamsFactory);
}
@Override
protected void startUp() throws Exception {
- // Nothing to do.
+ log.info("Local Query Executor starting up.");
}
@Override
protected void shutDown() throws Exception {
+ log.info("Local Query Executor shutting down. Stopping all jobs...");
+
// Stop all of the running queries.
for(final KafkaStreams job : byQueryId.values()) {
job.close();
}
+
+ log.info("Local Query Executor shut down.");
}
@Override
@@ -106,6 +125,14 @@
lock.lock();
try {
+ // Make sure the Statements topic exists for the query.
+ final Set<String> topics = Sets.newHashSet(
+ KafkaTopics.statementsTopic(ryaInstance),
+ KafkaTopics.queryResultsTopic(query.getQueryId()));
+
+ // Make sure the Query Results topic exists for the query.
+ createKafkaTopic.createTopics(topics, 1, 1);
+
// Setup the Kafka Streams job that will execute.
final KafkaStreams streams = streamsFactory.make(ryaInstance, query);
streams.start();
diff --git a/extras/rya.streams/query-manager/src/main/xsd/QueryManagerConfig.xsd b/extras/rya.streams/query-manager/src/main/xsd/QueryManagerConfig.xsd
index c1285d4..21170bb 100644
--- a/extras/rya.streams/query-manager/src/main/xsd/QueryManagerConfig.xsd
+++ b/extras/rya.streams/query-manager/src/main/xsd/QueryManagerConfig.xsd
@@ -30,6 +30,13 @@
</xs:choice>
</xs:complexType>
</xs:element>
+ <xs:element name="queryExecutor">
+ <xs:complexType>
+ <xs:choice>
+ <xs:element name="localKafkaStreams" type="localKafkaStreams"/>
+ </xs:choice>
+ </xs:complexType>
+ </xs:element>
<xs:element name="performanceTunning">
<xs:complexType>
<xs:sequence>
@@ -65,6 +72,13 @@
</xs:sequence>
</xs:complexType>
+ <!-- Define what a local Kafka Streams query executor looks like. -->
+ <xs:complexType name="localKafkaStreams">
+ <xs:sequence>
+ <xs:element name="zookeepers" type="xs:string"/>
+ </xs:sequence>
+ </xs:complexType>
+
<!-- Define the legal range for a TCP port. -->
<xs:simpleType name="tcpPort">
<xs:restriction base="xs:int">
diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/LogEventWorkGeneratorTest.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/LogEventWorkGeneratorTest.java
new file mode 100644
index 0000000..da9be78
--- /dev/null
+++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/LogEventWorkGeneratorTest.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.querymanager;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.rya.streams.api.queries.QueryChangeLog;
+import org.apache.rya.streams.querymanager.QueryManager.LogEvent;
+import org.apache.rya.streams.querymanager.QueryManager.LogEvent.LogEventType;
+import org.apache.rya.streams.querymanager.QueryManager.LogEventWorkGenerator;
+import org.junit.Test;
+
+/**
+ * Unit tests the methods of {@link LogEventWorkGenerator}.
+ */
+public class LogEventWorkGeneratorTest {
+
+ @Test
+ public void shutdownSignalKillsThread() {
+ // The signal that will kill the notifying thread.
+ final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+ // The queue generated work is offered to.
+ final BlockingQueue<LogEvent> queue = new ArrayBlockingQueue<>(1);
+
+ // The listener that will perform the LogEventWorkGenerator work.
+ final LogEventWorkGenerator generator = new LogEventWorkGenerator(queue, 50, TimeUnit.MILLISECONDS, shutdownSignal);
+
+ // A thread that will attempt to notify the generator with a created change log.
+ final Thread notifyThread = new Thread(() -> {
+ generator.notifyCreate("rya", mock(QueryChangeLog.class));
+ });
+
+ // Fill the queue so that nothing may be offered to it.
+ queue.offer(LogEvent.delete("rya"));
+
+ // Start the thread and show that it is still alive after the offer period.
+ notifyThread.start();
+ assertTrue( ThreadUtil.stillAlive(notifyThread, 200) );
+
+ // Set the shutdown signal to true and join the thread. If we were able to join, then it shut down.
+ shutdownSignal.set(true);
+ assertFalse( ThreadUtil.stillAlive(notifyThread, 1000) );
+ }
+
+ @Test
+ public void notifyCreate() throws Exception {
+ // The signal that will kill the notifying thread.
+ final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+ // The queue generated work is offered to.
+ final BlockingQueue<LogEvent> queue = new ArrayBlockingQueue<>(1);
+
+ // The listener that will perform the LogEventWorkGenerator work.
+ final LogEventWorkGenerator generator = new LogEventWorkGenerator(queue, 50, TimeUnit.MILLISECONDS, shutdownSignal);
+
+ // A thread that will attempt to notify the generator with a created change log.
+ final CountDownLatch notified = new CountDownLatch(1);
+ final Thread notifyThread = new Thread(() -> {
+ generator.notifyCreate("rya", mock(QueryChangeLog.class));
+ notified.countDown();
+ });
+
+ try {
+ // Start the thread that performs the notification.
+ notifyThread.start();
+
+ // Wait for the thread to indicate it has notified and check the queue for the value.
+ notified.await(200, TimeUnit.MILLISECONDS);
+ final LogEvent event = queue.poll(200, TimeUnit.MILLISECONDS);
+ assertEquals(LogEventType.CREATE, event.getEventType());
+ assertEquals("rya", event.getRyaInstanceName());
+ } finally {
+ shutdownSignal.set(true);
+ notifyThread.join();
+ }
+ }
+
+ @Test
+ public void notifyDelete() throws Exception {
+ // The signal that will kill the notifying thread.
+ final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+ // The queue generated work is offered to.
+ final BlockingQueue<LogEvent> queue = new ArrayBlockingQueue<>(1);
+
+ // The listener that will perform the LogEventWorkGenerator work.
+ final LogEventWorkGenerator generator = new LogEventWorkGenerator(queue, 50, TimeUnit.MILLISECONDS, shutdownSignal);
+
+ // A thread that will attempt to notify the generator with a deleted change log.
+ final CountDownLatch notified = new CountDownLatch(1);
+ final Thread notifyThread = new Thread(() -> {
+ generator.notifyDelete("rya");
+ notified.countDown();
+ });
+
+ try {
+ // Start the thread that performs the notification.
+ notifyThread.start();
+
+ // Wait for the thread to indicate it has notified and check the queue for the value.
+ notified.await(200, TimeUnit.MILLISECONDS);
+ final LogEvent event = queue.poll(200, TimeUnit.MILLISECONDS);
+ assertEquals(LogEventType.DELETE, event.getEventType());
+ assertEquals("rya", event.getRyaInstanceName());
+ } finally {
+ shutdownSignal.set(true);
+ notifyThread.join();
+ }
+ }
+}
\ No newline at end of file
diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/LogEventWorkerTest.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/LogEventWorkerTest.java
new file mode 100644
index 0000000..cb708ed
--- /dev/null
+++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/LogEventWorkerTest.java
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.querymanager;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.api.queries.InMemoryQueryChangeLog;
+import org.apache.rya.streams.api.queries.QueryChange;
+import org.apache.rya.streams.api.queries.QueryChangeLog;
+import org.apache.rya.streams.querymanager.QueryManager.LogEvent;
+import org.apache.rya.streams.querymanager.QueryManager.LogEventWorker;
+import org.apache.rya.streams.querymanager.QueryManager.QueryEvent;
+import org.junit.Test;
+
+/**
+ * Unit tests the methods of {@link LogEventWorker}.
+ */
+public class LogEventWorkerTest {
+
+ @Test
+ public void shutdownSignalKillsThread() {
+ // The signal that will kill the working thread.
+ final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+ // The thread that will perform the LogEventWorker task.
+ final Thread logEventWorker = new Thread(new LogEventWorker(new ArrayBlockingQueue<>(1),
+ new ArrayBlockingQueue<>(1), 50, TimeUnit.MILLISECONDS, shutdownSignal));
+ logEventWorker.start();
+
+ // Wait longer than the poll time to see if the thread died. Show that it is still running.
+ assertTrue(ThreadUtil.stillAlive(logEventWorker, 200));
+
+ // Set the shutdown signal to true and join the thread. If we were able to join, then it shut down.
+ shutdownSignal.set(true);
+ assertFalse(ThreadUtil.stillAlive(logEventWorker, 500));
+ }
+
+ @Test
+ public void nofity_logCreated_doesNotExist() throws Exception {
+ // The signal that will kill the working thread.
+ final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+ // The queue used to feed work.
+ final BlockingQueue<LogEvent> logEventQueue = new ArrayBlockingQueue<>(10);
+
+ // The queue work is written to.
+ final BlockingQueue<QueryEvent> queryEventQueue = new ArrayBlockingQueue<>(10);
+
+ // The Query Change Log that will be watched.
+ final QueryChangeLog changeLog = new InMemoryQueryChangeLog();
+
+ // Write a message that indicates a new query should be active.
+ final UUID firstQueryId = UUID.randomUUID();
+ changeLog.write(QueryChange.create(firstQueryId, "select * where { ?a ?b ?c . }", true));
+
+ // Write a message that adds an active query, but then makes it inactive. Because both of these
+ // events are written to the log before the worker subscribes to the repository for updates, they
+ // must result in a single query stopped event.
+ final UUID secondQueryId = UUID.randomUUID();
+ changeLog.write(QueryChange.create(secondQueryId, "select * where { ?d ?e ?f . }", true));
+ changeLog.write(QueryChange.update(secondQueryId, false));
+
+ // Start the worker that will be tested.
+ final Thread logEventWorker = new Thread(new LogEventWorker(logEventQueue,
+ queryEventQueue, 50, TimeUnit.MILLISECONDS, shutdownSignal));
+ logEventWorker.start();
+
+ try {
+ // Write a unit of work that indicates a log was created.
+ final LogEvent createLogEvent = LogEvent.create("rya", changeLog);
+ logEventQueue.offer(createLogEvent);
+
+ // We must see the following Query Events added to the work queue.
+ // Query 1, executing.
+ // Query 2, stopped.
+ Set<QueryEvent> expectedEvents = new HashSet<>();
+ expectedEvents.add(QueryEvent.executing("rya",
+ new StreamsQuery(firstQueryId, "select * where { ?a ?b ?c . }", true)));
+ expectedEvents.add(QueryEvent.stopped("rya", secondQueryId));
+
+ Set<QueryEvent> queryEvents = new HashSet<>();
+ queryEvents.add( queryEventQueue.poll(500, TimeUnit.MILLISECONDS) );
+ queryEvents.add( queryEventQueue.poll(500, TimeUnit.MILLISECONDS) );
+
+ assertEquals(expectedEvents, queryEvents);
+
+ // Write an event to the change log that stops the first query.
+ changeLog.write(QueryChange.update(firstQueryId, false));
+
+ // Show it was also reflected in the changes.
+ // Query 1, stopped.
+ expectedEvents = new HashSet<>();
+ expectedEvents.add(QueryEvent.stopped("rya", firstQueryId));
+
+ queryEvents = new HashSet<>();
+ queryEvents.add( queryEventQueue.poll(500, TimeUnit.MILLISECONDS) );
+
+ assertEquals(expectedEvents, queryEvents);
+ } finally {
+ shutdownSignal.set(true);
+ logEventWorker.join();
+ }
+ }
+
+ @Test
+ public void nofity_logCreated_exists() throws Exception {
+ // The signal that will kill the working thread.
+ final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+ // The queue used to feed work.
+ final BlockingQueue<LogEvent> logEventQueue = new ArrayBlockingQueue<>(10);
+
+ // The queue work is written to.
+ final BlockingQueue<QueryEvent> queryEventQueue = new ArrayBlockingQueue<>(10);
+
+ // The Query Change Log that will be watched.
+ final QueryChangeLog changeLog = new InMemoryQueryChangeLog();
+
+ // Write a message that indicates a new query should be active.
+ final UUID firstQueryId = UUID.randomUUID();
+ changeLog.write(QueryChange.create(firstQueryId, "select * where { ?a ?b ?c . }", true));
+
+ // Start the worker that will be tested.
+ final Thread logEventWorker = new Thread(new LogEventWorker(logEventQueue,
+ queryEventQueue, 50, TimeUnit.MILLISECONDS, shutdownSignal));
+ logEventWorker.start();
+
+ try {
+ // Write a unit of work that indicates a log was created.
+ final LogEvent createLogEvent = LogEvent.create("rya", changeLog);
+ logEventQueue.offer(createLogEvent);
+
+ // Say the same log was created a second time.
+ logEventQueue.offer(createLogEvent);
+
+ // Show that only a single unit of work was added for the log. This indicates the
+ // second message was effectively skipped as it would have add its work added twice otherwise.
+ final Set<QueryEvent> expectedEvents = new HashSet<>();
+ expectedEvents.add(QueryEvent.executing("rya",
+ new StreamsQuery(firstQueryId, "select * where { ?a ?b ?c . }", true)));
+
+ final Set<QueryEvent> queryEvents = new HashSet<>();
+ queryEvents.add( queryEventQueue.poll(500, TimeUnit.MILLISECONDS) );
+
+ assertNull(queryEventQueue.poll(500, TimeUnit.MILLISECONDS));
+ assertEquals(expectedEvents, queryEvents);
+ } finally {
+ shutdownSignal.set(true);
+ logEventWorker.join();
+ }
+ }
+
+ @Test
+ public void notify_logDeleted_exists() throws Exception {
+ // The signal that will kill the working thread.
+ final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+ // The queue used to feed work.
+ final BlockingQueue<LogEvent> logEventQueue = new ArrayBlockingQueue<>(10);
+
+ // The queue work is written to.
+ final BlockingQueue<QueryEvent> queryEventQueue = new ArrayBlockingQueue<>(10);
+
+ // The Query Change Log that will be watched.
+ final QueryChangeLog changeLog = new InMemoryQueryChangeLog();
+
+ // Start the worker that will be tested.
+ final Thread logEventWorker = new Thread(new LogEventWorker(logEventQueue,
+ queryEventQueue, 50, TimeUnit.MILLISECONDS, shutdownSignal));
+ logEventWorker.start();
+
+ try {
+ // Write a unit of work that indicates a log was created.
+ final LogEvent createLogEvent = LogEvent.create("rya", changeLog);
+ logEventQueue.offer(createLogEvent);
+
+ // Write a unit of work that indicates a log was deleted.
+ logEventQueue.offer(LogEvent.delete("rya"));
+
+ // Show that a single unit of work was created for deleting everything for "rya".
+ assertEquals(QueryEvent.stopALL("rya"), queryEventQueue.poll(500, TimeUnit.MILLISECONDS));
+ assertNull(queryEventQueue.poll(500, TimeUnit.MILLISECONDS));
+ } finally {
+ shutdownSignal.set(true);
+ logEventWorker.join();
+ }
+ }
+
+ @Test
+ public void notify_logDeleted_doesNotExist() throws Exception {
+ // The signal that will kill the working thread.
+ final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+ // The queue used to feed work.
+ final BlockingQueue<LogEvent> logEventQueue = new ArrayBlockingQueue<>(10);
+
+ // The queue work is written to.
+ final BlockingQueue<QueryEvent> queryEventQueue = new ArrayBlockingQueue<>(10);
+
+ // Start the worker that will be tested.
+ final Thread logEventWorker = new Thread(new LogEventWorker(logEventQueue,
+ queryEventQueue, 50, TimeUnit.MILLISECONDS, shutdownSignal));
+ logEventWorker.start();
+
+ try {
+ // Write a unit of work that indicates a log was deleted. Since it was never created,
+ // this will not cause anything to be written to the QueryEvent queue.
+ logEventQueue.offer(LogEvent.delete("rya"));
+
+ // Show that a single unit of work was created for deleting everything for "rya".
+ assertNull(queryEventQueue.poll(500, TimeUnit.MILLISECONDS));
+ } finally {
+ shutdownSignal.set(true);
+ logEventWorker.join();
+ }
+ }
+}
\ No newline at end of file
diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryEventWorkGeneratorTest.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryEventWorkGeneratorTest.java
new file mode 100644
index 0000000..4495e19
--- /dev/null
+++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryEventWorkGeneratorTest.java
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.querymanager;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.api.queries.ChangeLogEntry;
+import org.apache.rya.streams.api.queries.QueryChange;
+import org.apache.rya.streams.querymanager.QueryManager.QueryEvent;
+import org.apache.rya.streams.querymanager.QueryManager.QueryEventWorkGenerator;
+import org.junit.Test;
+
+/**
+ * Unit tests the methods of {@link QueryEventWorkGenerator}.
+ */
+public class QueryEventWorkGeneratorTest {
+
+ @Test
+ public void shutdownSignalKillsThread() throws Exception {
+ // The signal that will kill the notifying thread.
+ final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+ // The queue generated work is offered to.
+ final BlockingQueue<QueryEvent> queue = new ArrayBlockingQueue<>(1);
+
+ // The listener that will perform the QueryEventWorkGenerator work.
+ final QueryEventWorkGenerator generator =
+ new QueryEventWorkGenerator("rya", new CountDownLatch(1), queue, 50, TimeUnit.MILLISECONDS, shutdownSignal);
+
+ // A thread that will attempt to notify the generator with a created query.
+ final Thread notifyThread = new Thread(() -> {
+ generator.notify(mock(ChangeLogEntry.class), Optional.empty());
+ });
+
+ // Fill the queue so that nothing may be offered to it.
+ queue.offer(QueryEvent.stopALL("rya"));
+
+ // Start the thread and show that it is still alive after the offer period.
+ notifyThread.start();
+ assertTrue( ThreadUtil.stillAlive(notifyThread, 200) );
+
+ // Set the shutdown signal to true and join the thread. If we were able to join, then it shut down.
+ shutdownSignal.set(true);
+ assertFalse( ThreadUtil.stillAlive(notifyThread, 1000) );
+ }
+
+ @Test
+ public void waitsForSubscriptionWork() throws Exception {
+ // The signal that will kill the notifying thread.
+ final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+ // The queue generated work is offered to.
+ final BlockingQueue<QueryEvent> queue = new ArrayBlockingQueue<>(1);
+
+ // The listener that will perform the QueryEventWorkGenerator work.
+ final CountDownLatch latch = new CountDownLatch(1);
+ final QueryEventWorkGenerator generator =
+ new QueryEventWorkGenerator("rya", latch, queue, 50, TimeUnit.MILLISECONDS, shutdownSignal);
+
+ // A thread that will attempt to notify the generator with a created query.
+ final UUID queryId = UUID.randomUUID();
+ final StreamsQuery query = new StreamsQuery(queryId, "query", true);
+ final Thread notifyThread = new Thread(() -> {
+ final QueryChange change = QueryChange.create(queryId, query.getSparql(), query.isActive());
+ final ChangeLogEntry<QueryChange> entry = new ChangeLogEntry<>(0, change);
+ generator.notify(entry, Optional.of(query));
+ });
+
+ // Start the thread.
+ notifyThread.start();
+
+ try {
+ // Wait longer than the blocking period and show the thread is still alive and nothing has been added
+ // to the work queue.
+ Thread.sleep(150);
+ assertTrue( notifyThread.isAlive() );
+
+ // Count down the latch.
+ latch.countDown();
+
+ // Show work was added to the queue and the notifying thread died.
+ final QueryEvent event = queue.poll(500, TimeUnit.MILLISECONDS);
+ final QueryEvent expected = QueryEvent.executing("rya", new StreamsQuery(queryId, query.getSparql(), query.isActive()));
+ assertEquals(expected, event);
+ } finally {
+ shutdownSignal.set(true);
+ notifyThread.join();
+ }
+ }
+
+ @Test
+ public void notifyCreate() throws Exception {
+ // The signal that will kill the notifying thread.
+ final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+ // The queue generated work is offered to.
+ final BlockingQueue<QueryEvent> queue = new ArrayBlockingQueue<>(1);
+
+ // The listener that will perform the QueryEventWorkGenerator work.
+ final CountDownLatch latch = new CountDownLatch(1);
+ latch.countDown();
+ final QueryEventWorkGenerator generator =
+ new QueryEventWorkGenerator("rya", latch, queue, 50, TimeUnit.MILLISECONDS, shutdownSignal);
+
+ // A thread that will attempt to notify the generator with a created query.
+ final UUID queryId = UUID.randomUUID();
+ final StreamsQuery query = new StreamsQuery(queryId, "query", true);
+ final Thread notifyThread = new Thread(() -> {
+ final QueryChange change = QueryChange.create(queryId, query.getSparql(), query.isActive());
+ final ChangeLogEntry<QueryChange> entry = new ChangeLogEntry<>(0, change);
+ generator.notify(entry, Optional.of(query));
+ });
+
+ // Start the thread.
+ notifyThread.start();
+
+ try {
+ // Show work was added to the queue and the notifying thread died.
+ final QueryEvent event = queue.poll(500, TimeUnit.MILLISECONDS);
+ final QueryEvent expected = QueryEvent.executing("rya", new StreamsQuery(queryId, query.getSparql(), query.isActive()));
+ assertEquals(expected, event);
+ } finally {
+ shutdownSignal.set(true);
+ notifyThread.join();
+ }
+ }
+
+ @Test
+ public void notifyDelete() throws Exception {
+ // The signal that will kill the notifying thread.
+ final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+ // The queue generated work is offered to.
+ final BlockingQueue<QueryEvent> queue = new ArrayBlockingQueue<>(1);
+
+ // The listener that will perform the QueryEventWorkGenerator work.
+ final CountDownLatch latch = new CountDownLatch(1);
+ latch.countDown();
+ final QueryEventWorkGenerator generator =
+ new QueryEventWorkGenerator("rya", latch, queue, 50, TimeUnit.MILLISECONDS, shutdownSignal);
+
+ // A thread that will attempt to notify the generator with a deleted query.
+ final UUID queryId = UUID.randomUUID();
+ final Thread notifyThread = new Thread(() -> {
+ final QueryChange change = QueryChange.delete(queryId);
+ final ChangeLogEntry<QueryChange> entry = new ChangeLogEntry<>(0, change);
+ generator.notify(entry, Optional.empty());
+ });
+
+ // Start the thread.
+ notifyThread.start();
+
+ try {
+ // Show work was added to the queue and the notifying thread died.
+ final QueryEvent event = queue.poll(500, TimeUnit.MILLISECONDS);
+ final QueryEvent expected = QueryEvent.stopped("rya", queryId);
+ assertEquals(expected, event);
+ } finally {
+ shutdownSignal.set(true);
+ notifyThread.join();
+ }
+ }
+
+ @Test
+ public void notifyUpdate_isActive() throws Exception {
+ // The signal that will kill the notifying thread.
+ final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+ // The queue generated work is offered to.
+ final BlockingQueue<QueryEvent> queue = new ArrayBlockingQueue<>(1);
+
+ // The listener that will perform the QueryEventWorkGenerator work.
+ final CountDownLatch latch = new CountDownLatch(1);
+ latch.countDown();
+ final QueryEventWorkGenerator generator =
+ new QueryEventWorkGenerator("rya", latch, queue, 50, TimeUnit.MILLISECONDS, shutdownSignal);
+
+ // A thread that will attempt to notify the generator with an update query change.
+ final UUID queryId = UUID.randomUUID();
+ final StreamsQuery query = new StreamsQuery(queryId, "query", true);
+ final Thread notifyThread = new Thread(() -> {
+ final QueryChange change = QueryChange.update(queryId, true);
+ final ChangeLogEntry<QueryChange> entry = new ChangeLogEntry<>(0, change);
+ generator.notify(entry, Optional.of(query));
+ });
+
+ // Start the thread.
+ notifyThread.start();
+
+ try {
+ // Show work was added to the queue and the notifying thread died.
+ final QueryEvent event = queue.poll(500, TimeUnit.MILLISECONDS);
+ final QueryEvent expected = QueryEvent.executing("rya", new StreamsQuery(queryId, query.getSparql(), query.isActive()));
+ assertEquals(expected, event);
+ } finally {
+ shutdownSignal.set(true);
+ notifyThread.join();
+ }
+ }
+
+ @Test
+ public void notifyUpdate_isNotActive() throws Exception {
+ // The signal that will kill the notifying thread.
+ final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+ // The queue generated work is offered to.
+ final BlockingQueue<QueryEvent> queue = new ArrayBlockingQueue<>(1);
+
+ // The listener that will perform the QueryEventWorkGenerator work.
+ final CountDownLatch latch = new CountDownLatch(1);
+ latch.countDown();
+ final QueryEventWorkGenerator generator =
+ new QueryEventWorkGenerator("rya", latch, queue, 50, TimeUnit.MILLISECONDS, shutdownSignal);
+
+ // A thread that will attempt to notify the generator with an update query change.
+ final UUID queryId = UUID.randomUUID();
+ final StreamsQuery query = new StreamsQuery(queryId, "query", false);
+ final Thread notifyThread = new Thread(() -> {
+ final QueryChange change = QueryChange.update(queryId, false);
+ final ChangeLogEntry<QueryChange> entry = new ChangeLogEntry<>(0, change);
+ generator.notify(entry, Optional.of(query));
+ });
+
+ // Start the thread.
+ notifyThread.start();
+
+ try {
+ // Show work was added to the queue and the notifying thread died.
+ final QueryEvent event = queue.poll(500, TimeUnit.MILLISECONDS);
+ final QueryEvent expected = QueryEvent.stopped("rya", queryId);
+ assertEquals(expected, event);
+ } finally {
+ shutdownSignal.set(true);
+ notifyThread.join();
+ }
+ }
+}
\ No newline at end of file
diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryEventWorkerTest.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryEventWorkerTest.java
new file mode 100644
index 0000000..33c0719
--- /dev/null
+++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryEventWorkerTest.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.querymanager;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.querymanager.QueryManager.QueryEvent;
+import org.apache.rya.streams.querymanager.QueryManager.QueryEventWorker;
+import org.junit.Test;
+
+/**
+ * Unit tests the methods of {@link QueryManager.QueryEventWorker}.
+ */
+public class QueryEventWorkerTest {
+
+ @Test
+ public void shutdownSignalKillsThread() {
+ // The signal that will kill the working thread.
+ final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+ // The thread that will perform the QueryEventWorker task.
+ final Thread queryEventWorker = new Thread(new QueryEventWorker(new ArrayBlockingQueue<>(1),
+ mock(QueryExecutor.class), 50, TimeUnit.MILLISECONDS, shutdownSignal));
+ queryEventWorker.start();
+
+ // Wait longer than the poll time to see if the thread died. Show that it is still running.
+ assertTrue(ThreadUtil.stillAlive(queryEventWorker, 200));
+
+ // Set the shutdown signal to true and join the thread. If we were able to join, then it shut down.
+ shutdownSignal.set(true);
+ assertFalse(ThreadUtil.stillAlive(queryEventWorker, 1000));
+ }
+
+ @Test
+ public void executingWork() throws Exception {
+ // The signal that will kill the working thread.
+ final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+ // The queue used to send the execute work to the thread.
+ final BlockingQueue<QueryEvent> queue = new ArrayBlockingQueue<>(1);
+
+ // The message that indicates a query needs to be executed.
+ final String ryaInstance = "rya";
+ final StreamsQuery query = new StreamsQuery(UUID.randomUUID(), "sparql", true);
+ final QueryEvent executingEvent = QueryEvent.executing(ryaInstance, query);
+
+ // Release a latch if the startQuery method on the queryExecutor is invoked with the correct values.
+ final CountDownLatch startQueryInvoked = new CountDownLatch(1);
+ final QueryExecutor queryExecutor = mock(QueryExecutor.class);
+ doAnswer(invocation -> {
+ startQueryInvoked.countDown();
+ return null;
+ }).when(queryExecutor).startQuery(ryaInstance, query);
+
+ // The thread that will perform the QueryEventWorker task.
+ final Thread queryEventWorker = new Thread(new QueryEventWorker(queue,
+ queryExecutor, 50, TimeUnit.MILLISECONDS, shutdownSignal));
+ try {
+ queryEventWorker.start();
+
+ // Provide a message indicating a query needs to be executing.
+ queue.put(executingEvent);
+
+ // Verify the Query Executor was told to start the query.
+ assertTrue( startQueryInvoked.await(150, TimeUnit.MILLISECONDS) );
+ } finally {
+ shutdownSignal.set(true);
+ queryEventWorker.join();
+ }
+ }
+
+ @Test
+ public void stoppedWork() throws Exception {
+ // The signal that will kill the working thread.
+ final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+ // The queue used to send the execute work to the thread.
+ final BlockingQueue<QueryEvent> queue = new ArrayBlockingQueue<>(1);
+
+ // The message that indicates a query needs to be stopped.
+ final UUID queryId = UUID.randomUUID();
+ final QueryEvent stoppedEvent = QueryEvent.stopped("rya", queryId);
+
+ // Release a latch if the stopQuery method on the queryExecutor is invoked with the correct values.
+ final CountDownLatch stopQueryInvoked = new CountDownLatch(1);
+ final QueryExecutor queryExecutor = mock(QueryExecutor.class);
+ doAnswer(invocation -> {
+ stopQueryInvoked.countDown();
+ return null;
+ }).when(queryExecutor).stopQuery(queryId);
+
+ final Thread queryEventWorker = new Thread(new QueryEventWorker(queue,
+ queryExecutor, 50, TimeUnit.MILLISECONDS, shutdownSignal));
+ try {
+ // The thread that will perform the QueryEventWorker task.
+ queryEventWorker.start();
+
+ // Provide a message indicating a query needs to be executing.
+ queue.put(stoppedEvent);
+
+ // Verify the Query Executor was told to stop the query.
+ assertTrue( stopQueryInvoked.await(150, TimeUnit.MILLISECONDS) );
+ } finally {
+ shutdownSignal.set(true);
+ queryEventWorker.join();
+ }
+ }
+
+ @Test
+ public void stopAllWork() throws Exception {
+ // The signal that will kill the working thread.
+ final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+ // The queue used to send the execute work to the thread.
+ final BlockingQueue<QueryEvent> queue = new ArrayBlockingQueue<>(1);
+
+ // The message that indicates all queries for a rya instance need to be stopped.
+ final String ryaInstance = "rya";
+ final QueryEvent stopAllEvent = QueryEvent.stopALL(ryaInstance);
+
+ // Release a latch if the stopQuery method on the queryExecutor is invoked with the correct values.
+ final CountDownLatch testMethodInvoked = new CountDownLatch(1);
+ final QueryExecutor queryExecutor = mock(QueryExecutor.class);
+ doAnswer(invocation -> {
+ testMethodInvoked.countDown();
+ return null;
+ }).when(queryExecutor).stopAll(ryaInstance);
+
+ final Thread queryEventWorker = new Thread(new QueryEventWorker(queue,
+ queryExecutor, 50, TimeUnit.MILLISECONDS, shutdownSignal));
+ try {
+ // The thread that will perform the QueryEventWorker task.
+ queryEventWorker.start();
+
+ // Provide a message indicating a query needs to be executing.
+ queue.put(stopAllEvent);
+
+ // Verify the Query Executor was told to stop all the queries.
+ assertTrue( testMethodInvoked.await(150, TimeUnit.MILLISECONDS) );
+ } finally {
+ shutdownSignal.set(true);
+ queryEventWorker.join();
+ }
+ }
+}
\ No newline at end of file
diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryManagerTest.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryManagerTest.java
index a1203a0..04e70c0 100644
--- a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryManagerTest.java
+++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryManagerTest.java
@@ -1,18 +1,20 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
package org.apache.rya.streams.querymanager;
@@ -34,13 +36,10 @@
import org.apache.rya.streams.querymanager.QueryChangeLogSource.SourceListener;
import org.junit.Test;
-import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
-
/**
- * Test for the {@link QueryManager}
+ * Unit tests the methods of {@link QueryManager}.
*/
public class QueryManagerTest {
- private static final Scheduler TEST_SCHEDULER = Scheduler.newFixedRateSchedule(0, 100, TimeUnit.MILLISECONDS);
/**
* Tests when the query manager is notified to create a new query, the query
@@ -74,7 +73,7 @@
return null;
}).when(source).subscribe(any(SourceListener.class));
- final QueryManager qm = new QueryManager(qe, source, TEST_SCHEDULER);
+ final QueryManager qm = new QueryManager(qe, source, 50, TimeUnit.MILLISECONDS);
try {
qm.startAndWait();
queryStarted.await(5, TimeUnit.SECONDS);
@@ -128,7 +127,7 @@
return null;
}).when(source).subscribe(any(SourceListener.class));
- final QueryManager qm = new QueryManager(qe, source, TEST_SCHEDULER);
+ final QueryManager qm = new QueryManager(qe, source, 50, TimeUnit.MILLISECONDS);
try {
qm.startAndWait();
queryDeleted.await(5, TimeUnit.SECONDS);
@@ -183,7 +182,7 @@
return null;
}).when(source).subscribe(any(SourceListener.class));
- final QueryManager qm = new QueryManager(qe, source, TEST_SCHEDULER);
+ final QueryManager qm = new QueryManager(qe, source, 50, TimeUnit.MILLISECONDS);
try {
qm.startAndWait();
queryDeleted.await(10, TimeUnit.SECONDS);
@@ -192,4 +191,4 @@
qm.stopAndWait();
}
}
-}
+}
\ No newline at end of file
diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/ThreadUtil.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/ThreadUtil.java
new file mode 100644
index 0000000..9896e31
--- /dev/null
+++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/ThreadUtil.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.querymanager;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Utilities that are useful for interacting with {@link Thread}s while testing.
+ */
+public class ThreadUtil {
+
+ /**
+ * Private constructor to prevent instantiation.
+ */
+ private ThreadUtil() { }
+
+ /**
+ * A utility function that returns whether a thread is alive or not after waiting
+ * some specified period of time to join it.
+ *
+ * @param thread - The thread that will be joined. (not null)
+ * @param millis - How long to wait to join the thread.
+ * @return {@code true} if the thread is still alive, otherwise {@code false}.
+ */
+ public static boolean stillAlive(final Thread thread, final long millis) {
+ requireNonNull(thread);
+ try {
+ thread.join(millis);
+ } catch (final InterruptedException e) { }
+ return thread.isAlive();
+ }
+}
\ No newline at end of file
diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java
index 3cbe894..f9c8a03 100644
--- a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java
+++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java
@@ -35,6 +35,7 @@
import org.apache.rya.streams.kafka.KafkaStreamsFactory;
import org.apache.rya.streams.kafka.KafkaTopics;
import org.apache.rya.streams.kafka.SingleThreadKafkaStreamsFactory;
+import org.apache.rya.streams.kafka.interactor.CreateKafkaTopic;
import org.apache.rya.streams.kafka.interactor.KafkaLoadStatements;
import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
import org.apache.rya.streams.kafka.serialization.VisibilityStatementSerializer;
@@ -119,9 +120,10 @@
expected.add(new VisibilityBindingSet(bs, "a"));
// Start the executor that will be tested.
+ final CreateKafkaTopic createKafkaTopic = new CreateKafkaTopic( kafka.getZookeeperServers() );
final String kafkaServers = kafka.getKafkaHostname() + ":" + kafka.getKafkaPort();
final KafkaStreamsFactory jobFactory = new SingleThreadKafkaStreamsFactory(kafkaServers);
- final QueryExecutor executor = new LocalQueryExecutor(jobFactory);
+ final QueryExecutor executor = new LocalQueryExecutor(createKafkaTopic, jobFactory);
executor.startAndWait();
try {
// Start the query.
diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorTest.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorTest.java
index 0df5794..c0f888e 100644
--- a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorTest.java
+++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorTest.java
@@ -32,6 +32,7 @@
import org.apache.kafka.streams.KafkaStreams;
import org.apache.rya.streams.api.entity.StreamsQuery;
import org.apache.rya.streams.kafka.KafkaStreamsFactory;
+import org.apache.rya.streams.kafka.interactor.CreateKafkaTopic;
import org.apache.rya.streams.querymanager.QueryExecutor;
import org.junit.Test;
@@ -44,7 +45,7 @@
@Test(expected = IllegalStateException.class)
public void startQuery_serviceNotStarted() throws Exception {
- final QueryExecutor executor = new LocalQueryExecutor(mock(KafkaStreamsFactory.class));
+ final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), mock(KafkaStreamsFactory.class));
executor.startQuery("rya", new StreamsQuery(UUID.randomUUID(), "query", true));
}
@@ -60,7 +61,7 @@
when(jobFactory.make(eq(ryaInstance), eq(query))).thenReturn(queryJob);
// Start the executor that will be tested.
- final QueryExecutor executor = new LocalQueryExecutor(jobFactory);
+ final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), jobFactory);
executor.startAndWait();
try {
// Tell the executor to start the query.
@@ -75,14 +76,14 @@
@Test(expected = IllegalStateException.class)
public void stopQuery_serviceNotStarted() throws Exception {
- final QueryExecutor executor = new LocalQueryExecutor(mock(KafkaStreamsFactory.class));
+ final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), mock(KafkaStreamsFactory.class));
executor.stopQuery(UUID.randomUUID());
}
@Test
public void stopQuery_queryNotRunning() throws Exception {
// Start an executor.
- final QueryExecutor executor = new LocalQueryExecutor(mock(KafkaStreamsFactory.class));
+ final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), mock(KafkaStreamsFactory.class));
executor.startAndWait();
try {
// Try to stop a query that was never stareted.
@@ -104,7 +105,7 @@
when(jobFactory.make(eq(ryaInstance), eq(query))).thenReturn(queryJob);
// Start the executor that will be tested.
- final QueryExecutor executor = new LocalQueryExecutor(jobFactory);
+ final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), jobFactory);
executor.startAndWait();
try {
// Tell the executor to start the query.
@@ -122,7 +123,7 @@
@Test(expected = IllegalStateException.class)
public void stopAll_serviceNotStarted() throws Exception {
- final QueryExecutor executor = new LocalQueryExecutor(mock(KafkaStreamsFactory.class));
+ final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), mock(KafkaStreamsFactory.class));
executor.stopAll("rya");
}
@@ -141,7 +142,7 @@
when(jobFactory.make(eq(ryaInstance), eq(query2))).thenReturn(queryJob2);
// Start the executor that will be tested.
- final QueryExecutor executor = new LocalQueryExecutor(jobFactory);
+ final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), jobFactory);
executor.startAndWait();
try {
// Tell the executor to start the queries.
@@ -180,7 +181,7 @@
when(jobFactory.make(eq(ryaInstance2), eq(query2))).thenReturn(queryJob2);
// Start the executor that will be tested.
- final QueryExecutor executor = new LocalQueryExecutor(jobFactory);
+ final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), jobFactory);
executor.startAndWait();
try {
// Tell the executor to start the queries.
@@ -205,14 +206,14 @@
@Test(expected = IllegalStateException.class)
public void getRunningQueryIds_serviceNotStarted() throws Exception {
- final QueryExecutor executor = new LocalQueryExecutor(mock(KafkaStreamsFactory.class));
+ final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), mock(KafkaStreamsFactory.class));
executor.getRunningQueryIds();
}
@Test
public void getRunningQueryIds_noneStarted() throws Exception {
// Start an executor.
- final QueryExecutor executor = new LocalQueryExecutor(mock(KafkaStreamsFactory.class));
+ final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), mock(KafkaStreamsFactory.class));
executor.startAndWait();
try {
// Get the list of running queries.
@@ -240,7 +241,7 @@
when(jobFactory.make(eq(ryaInstance), eq(query3))).thenReturn(mock(KafkaStreams.class));
// Start the executor that will be tested.
- final QueryExecutor executor = new LocalQueryExecutor(jobFactory);
+ final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), jobFactory);
executor.startAndWait();
try {
// Start the queries.
@@ -275,7 +276,7 @@
when(jobFactory.make(eq(ryaInstance), eq(query3))).thenReturn(mock(KafkaStreams.class));
// Start the executor that will be tested.
- final QueryExecutor executor = new LocalQueryExecutor(jobFactory);
+ final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), jobFactory);
executor.startAndWait();
try {
// Start the queries.
diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/xml/QueryManagerConfigMarshallerTest.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/xml/QueryManagerConfigMarshallerTest.java
index f2b50ab..de6b9f3 100644
--- a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/xml/QueryManagerConfigMarshallerTest.java
+++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/xml/QueryManagerConfigMarshallerTest.java
@@ -45,6 +45,11 @@
" <port>6</port>\n" +
" </kafka>\n" +
" </queryChangeLogSource>\n" +
+ " <queryExecutor>\n" +
+ " <localKafkaStreams>\n" +
+ " <zookeepers>zoo1,zoo2,zoo3</zookeepers>\n" +
+ " </localKafkaStreams>\n" +
+ " </queryExecutor>\n" +
" <performanceTunning>\n" +
" <queryChanngeLogDiscoveryPeriod>\n" +
" <value>1</value>\n" +