RYA-456 Implement a Single Node implementation of QueryExecutor.
diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java
index 7e3b8bc..5d63f32 100644
--- a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java
+++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java
@@ -91,7 +91,8 @@
}
@After
- public void cleanup() throws Exception{
+ public void cleanup() throws Exception {
+ queryRepo.stopAndWait();
stmtProducer.close();
resultConsumer.close();
}
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaStreamsFactory.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaStreamsFactory.java
new file mode 100644
index 0000000..bd8ff1e
--- /dev/null
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaStreamsFactory.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka;
+
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.rya.streams.api.entity.StreamsQuery;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Builds {@link KafkaStreams} objects that are able to process a specific {@link StreamsQuery}.
+ */
+@DefaultAnnotation(NonNull.class)
+public interface KafkaStreamsFactory {
+
+ /**
+ * Builds a {@link KafkaStreams} object that is able to process a specific {@link StreamsQuery}.
+ *
+ * @param ryaInstance - The Rya Instance the streams job is for. (not null)
+ * @param query - Defines the query that will be executed. (not null)
+ * @return A {@link KafkaStreams} object that will process the provided query.
+ * @throws KafkaStreamsFactoryException Unable to create a {@link KafkaStreams} object from the provided values.
+ */
+ public KafkaStreams make(String ryaInstance, StreamsQuery query) throws KafkaStreamsFactoryException;
+
+ /**
+ * A {@link KafkaStreamsFactory} could not create a {@link KafkaStreams} object.
+ */
+ public static class KafkaStreamsFactoryException extends Exception {
+ private static final long serialVersionUID = 1L;
+
+ public KafkaStreamsFactoryException(final String message) {
+ super(message);
+ }
+
+ public KafkaStreamsFactoryException(final String message, final Throwable cause) {
+ super(message, cause);
+ }
+ }
+}
\ No newline at end of file
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/SingleThreadKafkaStreamsFactory.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/SingleThreadKafkaStreamsFactory.java
new file mode 100644
index 0000000..7ab7e90
--- /dev/null
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/SingleThreadKafkaStreamsFactory.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Properties;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.rya.api.function.projection.RandomUUIDFactory;
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.kafka.topology.TopologyBuilderFactory;
+import org.apache.rya.streams.kafka.topology.TopologyBuilderFactory.TopologyBuilderException;
+import org.apache.rya.streams.kafka.topology.TopologyFactory;
+import org.openrdf.query.MalformedQueryException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Creates {@link KafkaStreams} objects that are able to process {@link StreamsQuery}s
+ * using a single thread of execution starting from the earliest point in within the
+ * input topic. The Application ID used by the client is based on the Query ID of the
+ * query that is being executed so that this job may resume where it left off if it
+ * is stopped.
+ */
+@DefaultAnnotation(NonNull.class)
+public class SingleThreadKafkaStreamsFactory implements KafkaStreamsFactory {
+
+ private final TopologyBuilderFactory topologyFactory = new TopologyFactory();
+
+ private final String bootstrapServersConfig;
+
+ /**
+ * Constructs an instance of {@link SingleThreadKafkaStreamsFactory}.
+ *
+ * @param bootstrapServersConfig - Configures which Kafka cluster the jobs will interact with. (not null)
+ */
+ public SingleThreadKafkaStreamsFactory(final String bootstrapServersConfig) {
+ this.bootstrapServersConfig = requireNonNull(bootstrapServersConfig);
+ }
+
+ @Override
+ public KafkaStreams make(final String ryaInstance, final StreamsQuery query) throws KafkaStreamsFactoryException {
+ requireNonNull(ryaInstance);
+ requireNonNull(query);
+
+ // Setup the Kafka Stream program.
+ final Properties streamsProps = new Properties();
+
+ // Configure the Kafka servers that will be talked to.
+ streamsProps.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServersConfig);
+
+ // Use the Query ID as the Application ID to ensure we resume where we left off the last time this command was run.
+ streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "RyaStreams-Query-" + query.getQueryId());
+
+ // Always start at the beginning of the input topic.
+ streamsProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+ // Setup the topology that processes the Query.
+ final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(query.getQueryId());
+
+ try {
+ final TopologyBuilder topologyBuilder = topologyFactory.build(query.getSparql(), statementsTopic, resultsTopic, new RandomUUIDFactory());
+ return new KafkaStreams(topologyBuilder, new StreamsConfig(streamsProps));
+ } catch (MalformedQueryException | TopologyBuilderException e) {
+ throw new KafkaStreamsFactoryException("Could not create a KafkaStreams processing topology for query " + query.getQueryId(), e);
+ }
+ }
+}
\ No newline at end of file
diff --git a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryExecutor.java b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryExecutor.java
index 4572f08..bcee796 100644
--- a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryExecutor.java
+++ b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryExecutor.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -35,36 +35,41 @@
*/
@DefaultAnnotation(NonNull.class)
public interface QueryExecutor extends Service {
+
/**
* Starts running a {@link StreamsQuery}.
*
* @param ryaInstanceName - The rya instance whose {@link Statement}s will be processed by the query. (not null)
* @param query - The query to run. (not null)
* @throws QueryExecutorException When the query fails to start.
+ * @throws IllegalStateException The service has not been started yet.
*/
- public void startQuery(final String ryaInstanceName, final StreamsQuery query) throws QueryExecutorException;
+ public void startQuery(final String ryaInstanceName, final StreamsQuery query) throws QueryExecutorException, IllegalStateException;
/**
* Stops a {@link StreamsQuery}.
*
* @param queryID - The ID of the query to stop. (not null)
* @throws QueryExecutorException When the query fails to stop.
+ * @throws IllegalStateException The service has not been started yet.
*/
- public void stopQuery(final UUID queryID) throws QueryExecutorException;
+ public void stopQuery(final UUID queryID) throws QueryExecutorException, IllegalStateException;
/**
* Stops all {@link StreamsQuery} belonging to a specific rya instance.
*
* @param ryaInstanceName - The name of the rya instance to stop all queries for. (not null)
* @throws QueryExecutorException When the queries fails to stop.
+ * @throws IllegalStateException The service has not been started yet.
*/
- public void stopAll(final String ryaInstanceName) throws QueryExecutorException;
+ public void stopAll(final String ryaInstanceName) throws QueryExecutorException, IllegalStateException;
/**
- * @return - A set of {@link UUID}s representing the current active queries.
+ * @return A set of {@link UUID}s representing the current active queries.
* @throws QueryExecutorException Can't discover which queries are currently running.
+ * @throws IllegalStateException The service has not been started yet.
*/
- public Set<UUID> getRunningQueryIds() throws QueryExecutorException;
+ public Set<UUID> getRunningQueryIds() throws QueryExecutorException, IllegalStateException;
/**
* Exception to be used by {@link QueryExecutor} when queries fail to start or stop.
@@ -100,4 +105,4 @@
super(cause);
}
}
-}
+}
\ No newline at end of file
diff --git a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutor.java b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutor.java
new file mode 100644
index 0000000..947a215
--- /dev/null
+++ b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutor.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.querymanager.kafka;
+
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.Objects.requireNonNull;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.kafka.KafkaStreamsFactory;
+import org.apache.rya.streams.kafka.KafkaStreamsFactory.KafkaStreamsFactoryException;
+import org.apache.rya.streams.querymanager.QueryExecutor;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.util.concurrent.AbstractIdleService;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import kafka.consumer.KafkaStream;
+
+/**
+ * A {@link QueryExecutor} that runs a {@link KafkaStreams} job within its own JVM every
+ * time {@link #startQuery(String, StreamsQuery)} is invoked.
+ * <p/>
+ * This executor may run out of JVM resources if it is used to execute too many queries.
+ */
+@DefaultAnnotation(NonNull.class)
+public class LocalQueryExecutor extends AbstractIdleService implements QueryExecutor {
+
+ /**
+ * Provides thread safety when interacting with this class.
+ */
+ public static ReentrantLock lock = new ReentrantLock();
+
+ /**
+ * Lookup the Rya Instance of a specific Query Id.
+ */
+ private final Map<UUID, String> ryaInstanceById = new HashMap<>();
+
+ /**
+ * Lookup the Query IDs that are running for a specific Rya Instance.
+ */
+ private final Multimap<String, UUID> idByRyaInstance = HashMultimap.create();
+
+ /**
+ * Lookup the executing {@link KafkaStreams} job for a running Query Id.
+ */
+ private final Map<UUID, KafkaStreams> byQueryId = new HashMap<>();
+
+ /**
+ * Builds the {@link KafkaStreams} objects that execute {@link KafkaStream}s.
+ */
+ private final KafkaStreamsFactory streamsFactory;
+
+ /**
+ * Constructs an instance of {@link LocalQueryExecutor}.
+ *
+ * @param streamsFactory - Builds the {@link KafkaStreams} objects that execute {@link KafkaStream}s. (not null)
+ */
+ public LocalQueryExecutor(final KafkaStreamsFactory streamsFactory) {
+ this.streamsFactory = requireNonNull(streamsFactory);
+ }
+
+ @Override
+ protected void startUp() throws Exception {
+ // Nothing to do.
+ }
+
+ @Override
+ protected void shutDown() throws Exception {
+ // Stop all of the running queries.
+ for(final KafkaStreams job : byQueryId.values()) {
+ job.close();
+ }
+ }
+
+ @Override
+ public void startQuery(final String ryaInstance, final StreamsQuery query) throws QueryExecutorException {
+ requireNonNull(ryaInstance);
+ requireNonNull(query);
+ checkState(state() == State.RUNNING, "The service must be RUNNING to execute this method.");
+
+ lock.lock();
+ try {
+ // Setup the Kafka Streams job that will execute.
+ final KafkaStreams streams = streamsFactory.make(ryaInstance, query);
+ streams.start();
+
+ // Mark which Rya Instance the Query ID is for.
+ ryaInstanceById.put(query.getQueryId(), ryaInstance);
+
+ // Add the Query ID to the collection of running queries for the Rya instance.
+ idByRyaInstance.put(ryaInstance, query.getQueryId());
+
+ // Add the running Kafka Streams job for the Query ID.
+ byQueryId.put(query.getQueryId(), streams);
+
+ } catch (final KafkaStreamsFactoryException e) {
+ throw new QueryExecutorException("Could not start query " + query.getQueryId(), e);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public void stopQuery(final UUID queryId) throws QueryExecutorException {
+ requireNonNull(queryId);
+ checkState(state() == State.RUNNING, "The service must be RUNNING to execute this method.");
+
+ lock.lock();
+ try {
+ if(byQueryId.containsKey(queryId)) {
+ // Stop the job from running.
+ final KafkaStreams streams = byQueryId.get(queryId);
+ streams.close();
+
+ // Remove it from the Rya Instance Name lookup.
+ final String ryaInstance = ryaInstanceById.remove(queryId);
+
+ // Remove it from the collection of running queries for the Rya Instance.
+ idByRyaInstance.remove(ryaInstance, queryId);
+
+ // Remove it from the running Kafka Streams job lookup.
+ byQueryId.remove(queryId);
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public void stopAll(final String ryaInstanceName) throws QueryExecutorException {
+ requireNonNull(ryaInstanceName);
+ checkState(state() == State.RUNNING, "The service must be RUNNING to execute this method.");
+
+ lock.lock();
+ try {
+ if(idByRyaInstance.containsKey(ryaInstanceName)) {
+ // A defensive copy of the queries so that we may remove them from the maps.
+ final Set<UUID> queryIds = new HashSet<>( idByRyaInstance.get(ryaInstanceName) );
+
+ // Stop each of them.
+ for(final UUID queryId : queryIds) {
+ stopQuery(queryId);
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public Set<UUID> getRunningQueryIds() throws QueryExecutorException {
+ lock.lock();
+ checkState(state() == State.RUNNING, "The service must be RUNNING to execute this method.");
+
+ try {
+ return new HashSet<>( byQueryId.keySet() );
+ } finally {
+ lock.unlock();
+ }
+ }
+}
\ No newline at end of file
diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java
new file mode 100644
index 0000000..3cbe894
--- /dev/null
+++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.querymanager.kafka;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.api.model.VisibilityStatement;
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.api.interactor.LoadStatements;
+import org.apache.rya.streams.kafka.KafkaStreamsFactory;
+import org.apache.rya.streams.kafka.KafkaTopics;
+import org.apache.rya.streams.kafka.SingleThreadKafkaStreamsFactory;
+import org.apache.rya.streams.kafka.interactor.KafkaLoadStatements;
+import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
+import org.apache.rya.streams.kafka.serialization.VisibilityStatementSerializer;
+import org.apache.rya.streams.querymanager.QueryExecutor;
+import org.apache.rya.test.kafka.KafkaTestInstanceRule;
+import org.apache.rya.test.kafka.KafkaTestUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.query.impl.MapBindingSet;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Integration tests the methods of {@link LocalQueryExecutor}.
+ */
+public class LocalQueryExecutorIT {
+
+ private final String ryaInstance = UUID.randomUUID().toString();
+
+ private Producer<String, VisibilityStatement> stmtProducer = null;
+ private Consumer<String, VisibilityBindingSet> resultConsumer = null;
+
+ @Rule
+ public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(false);
+
+ @Before
+ public void setup() {
+ // Make sure the topic that the change log uses exists.
+ final String changeLogTopic = KafkaTopics.queryChangeLogTopic("" + ryaInstance);
+ kafka.createTopic(changeLogTopic);
+
+ // Initialize the Statements Producer and the Results Consumer.
+ stmtProducer = KafkaTestUtil.makeProducer(kafka, StringSerializer.class, VisibilityStatementSerializer.class);
+ resultConsumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, VisibilityBindingSetDeserializer.class);
+ }
+
+ @After
+ public void cleanup() throws Exception {
+ stmtProducer.close();
+ resultConsumer.close();
+ }
+
+ @Test
+ public void runQuery() throws Exception {
+ // Test values.
+ final String ryaInstance = "rya";
+ final StreamsQuery sQuery = new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?person <urn:worksAt> ?business . }", true);
+
+ // Create the statements that will be loaded.
+ final ValueFactory vf = new ValueFactoryImpl();
+ final List<VisibilityStatement> statements = new ArrayList<>();
+ statements.add(new VisibilityStatement(vf.createStatement(
+ vf.createURI("urn:Alice"),
+ vf.createURI("urn:worksAt"),
+ vf.createURI("urn:BurgerJoint")), "a"));
+ statements.add(new VisibilityStatement(vf.createStatement(
+ vf.createURI("urn:Bob"),
+ vf.createURI("urn:worksAt"),
+ vf.createURI("urn:TacoShop")), "a"));
+ statements.add(new VisibilityStatement(vf.createStatement(
+ vf.createURI("urn:Charlie"),
+ vf.createURI("urn:worksAt"),
+ vf.createURI("urn:TacoShop")), "a"));
+
+ // Create the expected results.
+ final List<VisibilityBindingSet> expected = new ArrayList<>();
+ MapBindingSet bs = new MapBindingSet();
+ bs.addBinding("person", vf.createURI("urn:Alice"));
+ bs.addBinding("business", vf.createURI("urn:BurgerJoint"));
+ expected.add(new VisibilityBindingSet(bs, "a"));
+ bs = new MapBindingSet();
+ bs.addBinding("person", vf.createURI("urn:Bob"));
+ bs.addBinding("business", vf.createURI("urn:TacoShop"));
+ expected.add(new VisibilityBindingSet(bs, "a"));
+ bs = new MapBindingSet();
+ bs.addBinding("person", vf.createURI("urn:Charlie"));
+ bs.addBinding("business", vf.createURI("urn:TacoShop"));
+ expected.add(new VisibilityBindingSet(bs, "a"));
+
+ // Start the executor that will be tested.
+ final String kafkaServers = kafka.getKafkaHostname() + ":" + kafka.getKafkaPort();
+ final KafkaStreamsFactory jobFactory = new SingleThreadKafkaStreamsFactory(kafkaServers);
+ final QueryExecutor executor = new LocalQueryExecutor(jobFactory);
+ executor.startAndWait();
+ try {
+ // Start the query.
+ executor.startQuery(ryaInstance, sQuery);
+
+ // Wait for the program to start.
+ Thread.sleep(5000);
+
+ // Write some statements to the program.
+ final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+ final LoadStatements loadStatements = new KafkaLoadStatements(statementsTopic, stmtProducer);
+ loadStatements.fromCollection(statements);
+
+ // Read the output of the streams program.
+ final String resultsTopic = KafkaTopics.queryResultsTopic(sQuery.getQueryId());
+ resultConsumer.subscribe( Lists.newArrayList(resultsTopic) );
+ final List<VisibilityBindingSet> results = KafkaTestUtil.pollForResults(500, 6, 3, resultConsumer);
+ assertEquals(expected, results);
+
+ } finally {
+ executor.stopAndWait();
+ }
+ }
+}
\ No newline at end of file
diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorTest.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorTest.java
new file mode 100644
index 0000000..0df5794
--- /dev/null
+++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorTest.java
@@ -0,0 +1,299 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.querymanager.kafka;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.kafka.KafkaStreamsFactory;
+import org.apache.rya.streams.querymanager.QueryExecutor;
+import org.junit.Test;
+
+import com.google.common.collect.Sets;
+
+/**
+ * Unit tests the methods of {@link LocalQueryExecutor}.
+ */
+public class LocalQueryExecutorTest {
+
+ @Test(expected = IllegalStateException.class)
+ public void startQuery_serviceNotStarted() throws Exception {
+ final QueryExecutor executor = new LocalQueryExecutor(mock(KafkaStreamsFactory.class));
+ executor.startQuery("rya", new StreamsQuery(UUID.randomUUID(), "query", true));
+ }
+
+ @Test
+ public void startQuery() throws Exception {
+ // Test values.
+ final String ryaInstance = "rya";
+ final StreamsQuery query = new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true);
+
+ // Mock the streams factory so that we can tell if the start function is invoked by the executor.
+ final KafkaStreamsFactory jobFactory = mock(KafkaStreamsFactory.class);
+ final KafkaStreams queryJob = mock(KafkaStreams.class);
+ when(jobFactory.make(eq(ryaInstance), eq(query))).thenReturn(queryJob);
+
+ // Start the executor that will be tested.
+ final QueryExecutor executor = new LocalQueryExecutor(jobFactory);
+ executor.startAndWait();
+ try {
+ // Tell the executor to start the query.
+ executor.startQuery(ryaInstance, query);
+
+ // Show a job was started for that query's ID.
+ verify(queryJob).start();
+ } finally {
+ executor.stopAndWait();
+ }
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void stopQuery_serviceNotStarted() throws Exception {
+ final QueryExecutor executor = new LocalQueryExecutor(mock(KafkaStreamsFactory.class));
+ executor.stopQuery(UUID.randomUUID());
+ }
+
+ @Test
+ public void stopQuery_queryNotRunning() throws Exception {
+ // Start an executor.
+ final QueryExecutor executor = new LocalQueryExecutor(mock(KafkaStreamsFactory.class));
+ executor.startAndWait();
+ try {
+ // Try to stop a query that was never stareted.
+ executor.stopQuery(UUID.randomUUID());
+ } finally {
+ executor.stopAndWait();
+ }
+ }
+
+ @Test
+ public void stopQuery() throws Exception {
+ // Test values.
+ final String ryaInstance = "rya";
+ final StreamsQuery query = new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true);
+
+ // Mock the streams factory so that we can tell if the stop function is invoked by the executor.
+ final KafkaStreamsFactory jobFactory = mock(KafkaStreamsFactory.class);
+ final KafkaStreams queryJob = mock(KafkaStreams.class);
+ when(jobFactory.make(eq(ryaInstance), eq(query))).thenReturn(queryJob);
+
+ // Start the executor that will be tested.
+ final QueryExecutor executor = new LocalQueryExecutor(jobFactory);
+ executor.startAndWait();
+ try {
+ // Tell the executor to start the query.
+ executor.startQuery(ryaInstance, query);
+
+ // Tell the executor to stop the query.
+ executor.stopQuery(query.getQueryId());
+
+ // Show a job was stopped for that query's ID.
+ verify(queryJob).close();
+ } finally {
+ executor.stopAndWait();
+ }
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void stopAll_serviceNotStarted() throws Exception {
+ final QueryExecutor executor = new LocalQueryExecutor(mock(KafkaStreamsFactory.class));
+ executor.stopAll("rya");
+ }
+
+ @Test
+ public void stopAll_noneForThatRyaInstance() throws Exception {
+ // Test values.
+ final String ryaInstance = "rya";
+ final StreamsQuery query1= new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true);
+ final StreamsQuery query2= new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true);
+
+ // Mock the streams factory so that we can tell if the stop function is invoked by the executor.
+ final KafkaStreamsFactory jobFactory = mock(KafkaStreamsFactory.class);
+ final KafkaStreams queryJob1 = mock(KafkaStreams.class);
+ final KafkaStreams queryJob2 = mock(KafkaStreams.class);
+ when(jobFactory.make(eq(ryaInstance), eq(query1))).thenReturn(queryJob1);
+ when(jobFactory.make(eq(ryaInstance), eq(query2))).thenReturn(queryJob2);
+
+ // Start the executor that will be tested.
+ final QueryExecutor executor = new LocalQueryExecutor(jobFactory);
+ executor.startAndWait();
+ try {
+ // Tell the executor to start the queries.
+ executor.startQuery(ryaInstance, query1);
+ executor.startQuery(ryaInstance, query2);
+
+ // Verify both are running.
+ verify(queryJob1).start();
+ verify(queryJob2).start();
+
+ // Tell the executor to stop queries running under rya2.
+ executor.stopAll("someOtherRyaInstance");
+
+ // Show none of the queries were stopped.
+ verify(queryJob1, never()).close();
+ verify(queryJob2, never()).close();
+
+ } finally {
+ executor.stopAndWait();
+ }
+ }
+
+ @Test
+ public void stopAll() throws Exception {
+ // Test values.
+ final String ryaInstance1 = "rya1";
+ final StreamsQuery query1= new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true);
+ final String ryaInstance2 = "rya2";
+ final StreamsQuery query2= new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true);
+
+ // Mock the streams factory so that we can tell if the stop function is invoked by the executor.
+ final KafkaStreamsFactory jobFactory = mock(KafkaStreamsFactory.class);
+ final KafkaStreams queryJob1 = mock(KafkaStreams.class);
+ when(jobFactory.make(eq(ryaInstance1), eq(query1))).thenReturn(queryJob1);
+ final KafkaStreams queryJob2 = mock(KafkaStreams.class);
+ when(jobFactory.make(eq(ryaInstance2), eq(query2))).thenReturn(queryJob2);
+
+ // Start the executor that will be tested.
+ final QueryExecutor executor = new LocalQueryExecutor(jobFactory);
+ executor.startAndWait();
+ try {
+ // Tell the executor to start the queries.
+ executor.startQuery(ryaInstance1, query1);
+ executor.startQuery(ryaInstance2, query2);
+
+ // Verify both are running.
+ verify(queryJob1).start();
+ verify(queryJob2).start();
+
+ // Tell the executor to stop queries running under rya2.
+ executor.stopAll(ryaInstance2);
+
+ // Show the first query is still running, but the second isn't.
+ verify(queryJob1, never()).close();
+ verify(queryJob2).close();
+
+ } finally {
+ executor.stopAndWait();
+ }
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void getRunningQueryIds_serviceNotStarted() throws Exception {
+ final QueryExecutor executor = new LocalQueryExecutor(mock(KafkaStreamsFactory.class));
+ executor.getRunningQueryIds();
+ }
+
+ @Test
+ public void getRunningQueryIds_noneStarted() throws Exception {
+ // Start an executor.
+ final QueryExecutor executor = new LocalQueryExecutor(mock(KafkaStreamsFactory.class));
+ executor.startAndWait();
+ try {
+ // Get the list of running queries.
+ final Set<UUID> runningQueries = executor.getRunningQueryIds();
+
+ // Show no queries are reported as running.
+ assertTrue(runningQueries.isEmpty());
+ } finally {
+ executor.stopAndWait();
+ }
+ }
+
+ @Test
+ public void getRunningQueryIds_noneStopped() throws Exception {
+ // Test values.
+ final String ryaInstance = "rya";
+ final StreamsQuery query1 = new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true);
+ final StreamsQuery query2 = new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true);
+ final StreamsQuery query3 = new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true);
+
+ // Mock the streams factory so that we can figure out what is started.
+ final KafkaStreamsFactory jobFactory = mock(KafkaStreamsFactory.class);
+ when(jobFactory.make(eq(ryaInstance), eq(query1))).thenReturn(mock(KafkaStreams.class));
+ when(jobFactory.make(eq(ryaInstance), eq(query2))).thenReturn(mock(KafkaStreams.class));
+ when(jobFactory.make(eq(ryaInstance), eq(query3))).thenReturn(mock(KafkaStreams.class));
+
+ // Start the executor that will be tested.
+ final QueryExecutor executor = new LocalQueryExecutor(jobFactory);
+ executor.startAndWait();
+ try {
+ // Start the queries.
+ executor.startQuery(ryaInstance, query1);
+ executor.startQuery(ryaInstance, query2);
+ executor.startQuery(ryaInstance, query3);
+
+ // All of those query IDs should be reported as running.
+ final Set<UUID> expected = Sets.newHashSet(
+ query1.getQueryId(),
+ query2.getQueryId(),
+ query3.getQueryId());
+ assertEquals(expected, executor.getRunningQueryIds());
+
+ } finally {
+ executor.stopAndWait();
+ }
+ }
+
+ @Test
+ public void getRunningQueryIds_stoppedNoLongerListed() throws Exception {
+ // Test values.
+ final String ryaInstance = "rya";
+ final StreamsQuery query1 = new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true);
+ final StreamsQuery query2 = new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true);
+ final StreamsQuery query3 = new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true);
+
+ // Mock the streams factory so that we can figure out what is started.
+ final KafkaStreamsFactory jobFactory = mock(KafkaStreamsFactory.class);
+ when(jobFactory.make(eq(ryaInstance), eq(query1))).thenReturn(mock(KafkaStreams.class));
+ when(jobFactory.make(eq(ryaInstance), eq(query2))).thenReturn(mock(KafkaStreams.class));
+ when(jobFactory.make(eq(ryaInstance), eq(query3))).thenReturn(mock(KafkaStreams.class));
+
+ // Start the executor that will be tested.
+ final QueryExecutor executor = new LocalQueryExecutor(jobFactory);
+ executor.startAndWait();
+ try {
+ // Start the queries.
+ executor.startQuery(ryaInstance, query1);
+ executor.startQuery(ryaInstance, query2);
+ executor.startQuery(ryaInstance, query3);
+
+ // Stop the second query.
+ executor.stopQuery(query2.getQueryId());
+
+ // Only the first and third queries are running.
+ final Set<UUID> expected = Sets.newHashSet(
+ query1.getQueryId(),
+ query3.getQueryId());
+ assertEquals(expected, executor.getRunningQueryIds());
+
+ } finally {
+ executor.stopAndWait();
+ }
+ }
+}
\ No newline at end of file