RYA-463 RYA-464 Added an isInsert flag to StreamsQuery to indicate when the results of a query need to be inserted back into Rya.
Conflicts:
common/rya.api/src/main/java/org/apache/rya/api/utils/QueryInvestigator.java
common/rya.api/src/test/java/org/apache/rya/api/utils/QueryInvestigatorTest.java
extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java
extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java
extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java
diff --git a/common/rya.api/src/main/java/org/apache/rya/api/utils/QueryInvestigator.java b/common/rya.api/src/main/java/org/apache/rya/api/utils/QueryInvestigator.java
index 2fbd09b..a6c93e9 100644
--- a/common/rya.api/src/main/java/org/apache/rya/api/utils/QueryInvestigator.java
+++ b/common/rya.api/src/main/java/org/apache/rya/api/utils/QueryInvestigator.java
@@ -100,4 +100,4 @@
}
}
}
-}
\ No newline at end of file
+}
diff --git a/common/rya.api/src/test/java/org/apache/rya/api/utils/QueryInvestigatorTest.java b/common/rya.api/src/test/java/org/apache/rya/api/utils/QueryInvestigatorTest.java
index fbe2ebe..bedc59a 100644
--- a/common/rya.api/src/test/java/org/apache/rya/api/utils/QueryInvestigatorTest.java
+++ b/common/rya.api/src/test/java/org/apache/rya/api/utils/QueryInvestigatorTest.java
@@ -147,4 +147,4 @@
public void isInsert_false_malformed() throws MalformedQueryException {
assertFalse( QueryInvestigator.isInsertWhere("not sparql") );
}
-}
\ No newline at end of file
+}
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java
index 11423bd..76cf6af 100644
--- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java
@@ -35,6 +35,7 @@
private final UUID queryId;
private final String sparql;
private final boolean isActive;
+ private final boolean isInsert;
/**
* Constructs an instance of {@link StreamsQuery}.
@@ -42,11 +43,18 @@
* @param queryId - Uniquely identifies the query within Rya Streams. (not null)
* @param sparql - The SPARQL query that defines how statements will be processed. (not null)
* @param isActive - {@code true} if Rya Streams should process this query; otherwise {@code false}.
+ * @param isInsert - {@code true} if Rya Streams should insert the results of the query back into
+ * the Rya instance the statements originated from; otherwise {@code false}.
*/
- public StreamsQuery(final UUID queryId, final String sparql, final boolean isActive) {
+ public StreamsQuery(
+ final UUID queryId,
+ final String sparql,
+ final boolean isActive,
+ final boolean isInsert) {
this.queryId = requireNonNull(queryId);
this.sparql = requireNonNull(sparql);
this.isActive = isActive;
+ this.isInsert = isInsert;
}
/**
@@ -70,9 +78,17 @@
return isActive;
}
+ /**
+ * @return {@code true} if Rya Streams should insert the results of the query back into
+ * the Rya instance the statements originated from; otherwise {@code false}.
+ */
+ public boolean isInsert() {
+ return isInsert;
+ }
+
@Override
public int hashCode() {
- return Objects.hash(queryId, sparql, isActive);
+ return Objects.hash(queryId, sparql, isActive, isInsert);
}
@Override
@@ -81,23 +97,17 @@
final StreamsQuery other = (StreamsQuery) o;
return Objects.equals(queryId, other.queryId) &&
Objects.equals(sparql, other.sparql) &&
- isActive == other.isActive;
+ isActive == other.isActive &&
+ isInsert == other.isInsert;
}
return false;
}
@Override
public String toString() {
- final StringBuilder sb = new StringBuilder();
- sb.append("ID: ");
- sb.append(getQueryId().toString() + "\n");
- sb.append("Query: ");
- sb.append(getSparql() + "\n");
- sb.append("Is ");
- if (!isActive) {
- sb.append("Not ");
- }
- sb.append("Running.\n");
- return sb.toString();
+ return "ID: " + queryId + "\n" +
+ "Query: " + sparql + "\n" +
+ "Is Active: " + isActive + "\n" +
+ "Is Insert: " + isInsert + "\n";
}
-}
\ No newline at end of file
+}
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/AddQuery.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/AddQuery.java
index 9889fd0..842399f 100644
--- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/AddQuery.java
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/AddQuery.java
@@ -36,8 +36,10 @@
* @param query - The SPARQL query that will be added. (not null)
* @param isActive - {@code true} if the query needs to be maintained by
* Rya Streams; otherwise {@code false}.
+ * @param isInsert - {@code true} if the query's reuslts need to be inserted into
+ * the Rya instance that originated the statements; otherwise {@code false}.
* @return The {@link StreamsQuery} used by Rya Streams for this query.
* @throws RyaStreamsException The query could not be added to Rya Streams.
*/
- public StreamsQuery addQuery(final String query, boolean isActive) throws RyaStreamsException;
+ public StreamsQuery addQuery(final String query, boolean isActive, boolean isInsert) throws RyaStreamsException;
}
\ No newline at end of file
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQuery.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQuery.java
index edd90fd..67edec0 100644
--- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQuery.java
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQuery.java
@@ -49,7 +49,7 @@
}
@Override
- public StreamsQuery addQuery(final String query, final boolean isActive) throws RyaStreamsException {
+ public StreamsQuery addQuery(final String query, final boolean isActive, final boolean isInsert) throws RyaStreamsException {
requireNonNull(query);
// Make sure the SPARQL is valid.
@@ -60,6 +60,6 @@
}
// If it is, then store it in the repository.
- return repository.add(query, isActive);
+ return repository.add(query, isActive, isInsert);
}
}
\ No newline at end of file
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java
index 95c1922..c71f0f8 100644
--- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java
@@ -92,7 +92,7 @@
}
@Override
- public StreamsQuery add(final String query, final boolean isActive)
+ public StreamsQuery add(final String query, final boolean isActive, final boolean isInsert)
throws QueryRepositoryException, IllegalStateException {
requireNonNull(query);
@@ -101,7 +101,7 @@
checkState();
// First record the change to the log.
final UUID queryId = UUID.randomUUID();
- final QueryChange change = QueryChange.create(queryId, query, isActive);
+ final QueryChange change = QueryChange.create(queryId, query, isActive, isInsert);
changeLog.write(change);
// Update the cache to represent what is currently in the log.
@@ -235,7 +235,8 @@
final StreamsQuery query = new StreamsQuery(
queryId,
change.getSparql().get(),
- change.getIsActive().get());
+ change.getIsActive().get(),
+ change.getIsInsert().get());
queriesCache.put(queryId, query);
break;
@@ -245,7 +246,8 @@
final StreamsQuery updated = new StreamsQuery(
old.getQueryId(),
old.getSparql(),
- change.getIsActive().get());
+ change.getIsActive().get(),
+ old.isInsert());
queriesCache.put(queryId, updated);
}
break;
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChange.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChange.java
index d34a394..fb58844 100644
--- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChange.java
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChange.java
@@ -42,6 +42,7 @@
private final ChangeType changeType;
private final Optional<String> sparql;
private final Optional<Boolean> isActive;
+ private final Optional<Boolean> isInsert;
/**
* Constructs an instance of {@link QueryChange}. Use the {@link #create(UUID, String)} or {@link #delete(UUID)}
@@ -52,16 +53,20 @@
* @param sparql - If this is a create change, then the SPARQL query that will be evaluated within Rya Streams. (not null)
* @param isActive - If this is a create or update change, then the active state that defines if the
* query will be evaluated by RyaStreams. (not null)
+ * @param isInsert - If this is a create change, then the insert state that defines if the
+ * results of the query will be inserted back into the originating Rya store. (not null)
*/
private QueryChange(
final UUID queryId,
final ChangeType changeType,
final Optional<String> sparql,
- final Optional<Boolean> isActive) {
+ final Optional<Boolean> isActive,
+ final Optional<Boolean> isInsert) {
this.queryId = requireNonNull(queryId);
this.changeType = requireNonNull(changeType);
this.sparql = requireNonNull(sparql);
this.isActive = requireNonNull(isActive);
+ this.isInsert = requireNonNull(isInsert);
}
/**
@@ -93,9 +98,17 @@
return isActive;
}
+ /**
+ * @return If this is a create change, then the insert state that defines if the
+ * results of the query will be inserted back into the originating Rya store.
+ */
+ public Optional<Boolean> getIsInsert() {
+ return isInsert;
+ }
+
@Override
public int hashCode() {
- return Objects.hash(queryId, changeType, sparql, isActive);
+ return Objects.hash(queryId, changeType, sparql, isActive, isInsert);
}
@Override
@@ -105,7 +118,8 @@
return Objects.equals(queryId, change.queryId) &&
Objects.equals(changeType, change.changeType) &&
Objects.equals(sparql, change.sparql) &&
- Objects.equals(isActive, change.isActive);
+ Objects.equals(isActive, change.isActive) &&
+ Objects.equals(isInsert, change.isInsert);
}
return false;
}
@@ -116,6 +130,7 @@
" Query ID: " + queryId + ",\n" +
" Change Type: " + changeType + ",\n" +
" Is Active: " + isActive + ",\n" +
+ " Is Insert: " + isInsert + ",\n" +
" SPARQL: " + sparql + "\n" +
"}";
}
@@ -125,22 +140,24 @@
*
* @param queryId - Uniquely identifies the query within the streaming system. (not null)
* @param sparql - The query that will be evaluated. (not null)
- * @param isActive - The active state that defines if the query will be evaluated by RyaStreams. (not null)
+ * @param isActive - The active state that defines if the query will be evaluated by RyaStreams.
+ * @param isInsert - The insert state that defines if the query's results will be inserted back
+ * into the Rya instance the originating statements came from.
* @return A {@link QueryChange} built using the provided values.
*/
- public static QueryChange create(final UUID queryId, final String sparql, final boolean isActive) {
- return new QueryChange(queryId, ChangeType.CREATE, Optional.of(sparql), Optional.of(isActive));
+ public static QueryChange create(final UUID queryId, final String sparql, final boolean isActive, final boolean isInsert) {
+ return new QueryChange(queryId, ChangeType.CREATE, Optional.of(sparql), Optional.of(isActive), Optional.of(isInsert));
}
/**
* Create a {@link QueryChange} that represents a query in Rya Streams whose active state has changed.
*
* @param queryId - Uniquely identifies the query within the streaming system. (not null)
- * @param isActive - The active state that defines if the query will be evaluated by RyaStreams. (not null)
+ * @param isActive - The active state that defines if the query will be evaluated by RyaStreams.
* @return A {@link QueryChange} built using the provided values.
*/
public static QueryChange update(final UUID queryId, final boolean isActive) {
- return new QueryChange(queryId, ChangeType.UPDATE, Optional.absent(), Optional.of(isActive));
+ return new QueryChange(queryId, ChangeType.UPDATE, Optional.absent(), Optional.of(isActive), Optional.absent());
}
/**
@@ -150,7 +167,7 @@
* @return A {@link QueryChange} built using the provided values.
*/
public static QueryChange delete(final UUID queryId) {
- return new QueryChange(queryId, ChangeType.DELETE, Optional.absent(), Optional.absent());
+ return new QueryChange(queryId, ChangeType.DELETE, Optional.absent(), Optional.absent(), Optional.absent());
}
/**
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryRepository.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryRepository.java
index 4d8b2db..e4bcd7f 100644
--- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryRepository.java
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryRepository.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -46,11 +46,13 @@
* @param query - The SPARQL query to add. (not null)
* @param isActive - {@code true} if the query should be processed after it is added
* otherwise {@code false}.
+ * @param isInsert - {@code true} if the query's results should be inserted back into
+ * the Rya instance the originating statements came from; otherwise {@code false}.
* @return The {@link StreamsQuery} used in Rya Streams.
* @throws QueryRepositoryException Could not add the query.
* @throws IllegalStateException The Service has not been started, but has been subscribed to.
*/
- public StreamsQuery add(final String query, boolean isActive) throws QueryRepositoryException, IllegalStateException;
+ public StreamsQuery add(final String query, boolean isActive, boolean isInsert) throws QueryRepositoryException, IllegalStateException;
/**
* Updates the isActive state of a {@link StreamsQuery}. Setting this value to {@code true}
diff --git a/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQueryTest.java b/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQueryTest.java
index 77a0a15..1a6ea88 100644
--- a/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQueryTest.java
+++ b/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQueryTest.java
@@ -43,10 +43,10 @@
final AddQuery addQuery = new DefaultAddQuery(repo);
// Add the query.
- addQuery.addQuery(sparql, true);
+ addQuery.addQuery(sparql, true, true);
// Verify the call was forwarded to the repository.
- verify(repo, times(1)).add(eq(sparql), eq(true));
+ verify(repo, times(1)).add(eq(sparql), eq(true), eq(true));
}
@Test(expected = RyaStreamsException.class)
@@ -59,6 +59,6 @@
final AddQuery addQuery = new DefaultAddQuery(repo);
// Add the query.
- addQuery.addQuery(sparql, true);
+ addQuery.addQuery(sparql, true, true);
}
}
\ No newline at end of file
diff --git a/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java b/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java
index d7e116b..5a16e79 100644
--- a/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java
+++ b/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java
@@ -50,9 +50,9 @@
final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog(), SCHEDULE );
// Add some queries to it.
final Set<StreamsQuery> expected = new HashSet<>();
- expected.add( queries.add("query 1", true) );
- expected.add( queries.add("query 2", false) );
- expected.add( queries.add("query 3", true) );
+ expected.add( queries.add("query 1", true, true) );
+ expected.add( queries.add("query 2", false, true) );
+ expected.add( queries.add("query 3", true, false) );
// Show they are in the list of all queries.
final Set<StreamsQuery> stored = queries.list();
@@ -65,9 +65,9 @@
final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog(), SCHEDULE );
// Add some queries to it. The second one we will delete.
final Set<StreamsQuery> expected = new HashSet<>();
- expected.add( queries.add("query 1", true) );
- final UUID deletedMeId = queries.add("query 2", false).getQueryId();
- expected.add( queries.add("query 3", true) );
+ expected.add( queries.add("query 1", true, true) );
+ final UUID deletedMeId = queries.add("query 2", false, true).getQueryId();
+ expected.add( queries.add("query 3", true, false) );
// Delete the second query.
queries.delete( deletedMeId );
@@ -86,16 +86,20 @@
queries.startAndWait();
// Add some queries and deletes to it.
final Set<StreamsQuery> expected = new HashSet<>();
- expected.add( queries.add("query 1", true) );
- final UUID deletedMeId = queries.add("query 2", false).getQueryId();
- expected.add( queries.add("query 3", true) );
+ expected.add( queries.add("query 1", true, true) );
+ final UUID deletedMeId = queries.add("query 2", false, true).getQueryId();
+ expected.add( queries.add("query 3", true, false) );
queries.delete( deletedMeId );
// Create a new totally in memory QueryRepository.
final QueryRepository initializedQueries = new InMemoryQueryRepository( changeLog, SCHEDULE );
- // Listing the queries should work using an initialized change log.
- final Set<StreamsQuery> stored = initializedQueries.list();
- assertEquals(expected, stored);
+ try {
+ // Listing the queries should work using an initialized change log.
+ final Set<StreamsQuery> stored = initializedQueries.list();
+ assertEquals(expected, stored);
+ } finally {
+ queries.stop();
+ }
} finally {
queries.stop();
}
@@ -117,7 +121,7 @@
// Setup a totally in memory QueryRepository.
final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog(), SCHEDULE );
// Add a query to it.
- final StreamsQuery query = queries.add("query 1", true);
+ final StreamsQuery query = queries.add("query 1", true, false);
// Show the fetched query matches the expected ones.
final Optional<StreamsQuery> fetched = queries.get(query.getQueryId());
@@ -140,14 +144,14 @@
// Setup a totally in memory QueryRepository.
final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog(), SCHEDULE );
// Add a query to it.
- final StreamsQuery query = queries.add("query 1", true);
+ final StreamsQuery query = queries.add("query 1", true, false);
// Change the isActive state of that query.
queries.updateIsActive(query.getQueryId(), false);
// Show the fetched query matches the expected one.
final Optional<StreamsQuery> fetched = queries.get(query.getQueryId());
- final StreamsQuery expected = new StreamsQuery(query.getQueryId(), query.getSparql(), false);
+ final StreamsQuery expected = new StreamsQuery(query.getQueryId(), query.getSparql(), false, false);
assertEquals(expected, fetched.get());
}
@@ -159,13 +163,13 @@
queries.startAndWait();
// Add a query to it.
- final StreamsQuery query = queries.add("query 1", true);
+ final StreamsQuery query = queries.add("query 1", true, false);
final Set<StreamsQuery> existing = queries.subscribe((queryChangeEvent, newQueryState) -> {
final ChangeLogEntry<QueryChange> expected = new ChangeLogEntry<>(1L,
- QueryChange.create(queryChangeEvent.getEntry().getQueryId(), "query 2", true));
+ QueryChange.create(queryChangeEvent.getEntry().getQueryId(), "query 2", true, false));
final Optional<StreamsQuery> expectedQueryState = Optional.of(
- new StreamsQuery(queryChangeEvent.getEntry().getQueryId(), "query 2", true));
+ new StreamsQuery(queryChangeEvent.getEntry().getQueryId(), "query 2", true, false));
assertEquals(expected, queryChangeEvent);
assertEquals(expectedQueryState, newQueryState);
@@ -173,7 +177,7 @@
assertEquals(Sets.newHashSet(query), existing);
- queries.add("query 2", true);
+ queries.add("query 2", true, false);
} finally {
queries.stop();
}
@@ -194,9 +198,9 @@
final CountDownLatch repo1Latch = new CountDownLatch(1);
queries.subscribe((queryChangeEvent, newQueryState) -> {
final ChangeLogEntry<QueryChange> expected = new ChangeLogEntry<>(0L,
- QueryChange.create(queryChangeEvent.getEntry().getQueryId(), "query 2", true));
+ QueryChange.create(queryChangeEvent.getEntry().getQueryId(), "query 2", true, false));
final Optional<StreamsQuery> expectedQueryState = Optional.of(
- new StreamsQuery(queryChangeEvent.getEntry().getQueryId(), "query 2", true));
+ new StreamsQuery(queryChangeEvent.getEntry().getQueryId(), "query 2", true, false));
assertEquals(expected, queryChangeEvent);
assertEquals(expectedQueryState, newQueryState);
@@ -207,16 +211,16 @@
final CountDownLatch repo2Latch = new CountDownLatch(1);
queries2.subscribe((queryChangeEvent, newQueryState) -> {
final ChangeLogEntry<QueryChange> expected = new ChangeLogEntry<>(0L,
- QueryChange.create(queryChangeEvent.getEntry().getQueryId(), "query 2", true));
+ QueryChange.create(queryChangeEvent.getEntry().getQueryId(), "query 2", true, false));
final Optional<StreamsQuery> expectedQueryState = Optional.of(
- new StreamsQuery(queryChangeEvent.getEntry().getQueryId(), "query 2", true));
+ new StreamsQuery(queryChangeEvent.getEntry().getQueryId(), "query 2", true, false));
assertEquals(expected, queryChangeEvent);
assertEquals(expectedQueryState, newQueryState);
repo2Latch.countDown();
});
- queries.add("query 2", true);
+ queries.add("query 2", true, false);
assertTrue(repo1Latch.await(5, TimeUnit.SECONDS));
assertTrue(repo2Latch.await(5, TimeUnit.SECONDS));
@@ -233,6 +237,6 @@
final QueryRepository queries = new InMemoryQueryRepository(new InMemoryQueryChangeLog(), SCHEDULE);
queries.subscribe((queryChangeEvent, newQueryState) -> {});
- queries.add("query 2", true);
+ queries.add("query 2", true, false);
}
-}
\ No newline at end of file
+}
diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/AddQueryCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/AddQueryCommand.java
index 9273c33..3886a95 100644
--- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/AddQueryCommand.java
+++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/AddQueryCommand.java
@@ -22,6 +22,7 @@
import java.util.concurrent.TimeUnit;
+import org.apache.rya.api.utils.QueryInvestigator;
import org.apache.rya.streams.api.entity.StreamsQuery;
import org.apache.rya.streams.api.exception.RyaStreamsException;
import org.apache.rya.streams.api.interactor.AddQuery;
@@ -32,6 +33,7 @@
import org.apache.rya.streams.client.RyaStreamsCommand;
import org.apache.rya.streams.kafka.KafkaTopics;
import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLogFactory;
+import org.openrdf.query.MalformedQueryException;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
@@ -58,6 +60,9 @@
@Parameter(names = {"--isActive", "-a"}, required = false, description = "True if the added query will be started.")
private String isActive;
+ @Parameter(names = {"--isInsert", "-n"}, required = false, description = "True if the reuslts of the query will be written back to Rya.")
+ private String isInsert;
+
@Override
public String toString() {
final StringBuilder parameters = new StringBuilder();
@@ -67,6 +72,7 @@
parameters.append("\tQuery: " + query + "\n");
}
parameters.append("\tIs Active: " + isActive + "\n");
+ parameters.append("\tis Insert: " + isInsert + "\n");
return parameters.toString();
}
}
@@ -125,17 +131,33 @@
try {
final AddQuery addQuery = new DefaultAddQuery(queryRepo);
try {
- final StreamsQuery query = addQuery.addQuery(params.query, Boolean.parseBoolean(params.isActive));
+ final Boolean isActive = Boolean.parseBoolean(params.isActive);
+ final Boolean isInsert = Boolean.parseBoolean(params.isInsert);
+
+ // If the query's results are meant to be written back to Rya, make sure it creates statements.
+ if(isInsert) {
+ final boolean isConstructQuery = QueryInvestigator.isConstruct(params.query);
+ final boolean isInsertQuery = QueryInvestigator.isInsertWhere(params.query);
+
+ if(isConstructQuery) {
+ System.out.println(
+ "WARNING: CONSTRUCT is part of the SPARQL Query API, so they do not normally\n" +
+ "get written back to the triple store. Consider using an INSERT, which is\n" +
+ "part of the SPARQL Update API, in the future.");
+ }
+
+ if(!(isConstructQuery || isInsertQuery)) {
+ throw new ArgumentsException("Only CONSTRUCT queries and INSERT updates may be inserted back to the triple store.");
+ }
+ }
+
+ final StreamsQuery query = addQuery.addQuery(params.query, isActive, isInsert);
System.out.println("Added query: " + query.getSparql());
} catch (final RyaStreamsException e) {
- System.err.println("Unable to parse query: " + params.query);
- e.printStackTrace();
- System.exit(1);
+ throw new ExecutionException("Unable to add the query to Rya Streams.", e);
}
- } catch (final Exception e) {
- System.err.println("Problem encountered while closing the QueryRepository.");
- e.printStackTrace();
- System.exit(1);
+ } catch(final MalformedQueryException e) {
+ throw new ArgumentsException("Could not parse the provided query.", e);
}
}
}
\ No newline at end of file
diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java
index a5507a6..3639aa9 100644
--- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java
+++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java
@@ -23,7 +23,6 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang.StringUtils;
import org.apache.rya.streams.api.entity.StreamsQuery;
import org.apache.rya.streams.api.exception.RyaStreamsException;
import org.apache.rya.streams.api.interactor.ListQueries;
@@ -116,9 +115,11 @@
sb.append("ID: ").append(query.getQueryId())
.append(" ")
.append("Is Active: ").append(query.isActive())
- .append(StringUtils.rightPad("" + query.isActive(), 9))
+ .append( query.isActive() ? " " : " " )
+ .append("Is Insert: ").append(query.isInsert())
+ .append(query.isInsert() ? " " : " ")
.append("Query: ").append(query.getSparql()).append("\n");
});
return sb.toString();
}
-}
\ No newline at end of file
+}
diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java
index 3bfbadc..bbbcb2a 100644
--- a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java
+++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java
@@ -33,6 +33,7 @@
import org.apache.rya.streams.api.queries.QueryChange;
import org.apache.rya.streams.api.queries.QueryChangeLog;
import org.apache.rya.streams.api.queries.QueryRepository;
+import org.apache.rya.streams.client.RyaStreamsCommand.ArgumentsException;
import org.apache.rya.streams.kafka.KafkaTopics;
import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog;
import org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserializer;
@@ -78,7 +79,8 @@
"-i", kafka.getKafkaHostname(),
"-p", kafka.getKafkaPort(),
"-q", query,
- "-a", "true"
+ "-a", "true",
+ "-n", "false"
};
// Execute the command.
@@ -100,7 +102,8 @@
"--kafkaHostname", kafka.getKafkaHostname(),
"--kafkaPort", kafka.getKafkaPort(),
"--query", query,
- "--isActive", "true"
+ "--isActive", "true",
+ "--isInsert", "false"
};
// Execute the command.
@@ -112,4 +115,22 @@
assertEquals(1, queries.size());
assertEquals(query, queries.iterator().next().getSparql());
}
+
+ @Test(expected = ArgumentsException.class)
+ public void canNotInsertQueries() throws Exception {
+ // Arguments that add a query to Rya Streams.
+ final String query = "SELECT * WHERE { ?person <urn:name> ?name }";
+ final String[] args = new String[] {
+ "--ryaInstance", "" + ryaInstance,
+ "--kafkaHostname", kafka.getKafkaHostname(),
+ "--kafkaPort", kafka.getKafkaPort(),
+ "--query", query,
+ "--isActive", "true",
+ "--isInsert", "true"
+ };
+
+ // Execute the command.
+ final AddQueryCommand command = new AddQueryCommand();
+ command.execute(args);
+ }
}
\ No newline at end of file
diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java
index 7bec080..4cfdedb 100644
--- a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java
+++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java
@@ -74,9 +74,9 @@
@Test
public void shortParams() throws Exception {
// Add a few queries to Rya Streams.
- queryRepo.add("query1", true);
- final UUID query2Id = queryRepo.add("query2", false).getQueryId();
- queryRepo.add("query3", true);
+ queryRepo.add("query1", true, true);
+ final UUID query2Id = queryRepo.add("query2", false, true).getQueryId();
+ queryRepo.add("query3", true, false);
// Show that all three of the queries were added.
Set<StreamsQuery> queries = queryRepo.list();
@@ -105,9 +105,9 @@
@Test
public void longParams() throws Exception {
// Add a few queries to Rya Streams.
- queryRepo.add("query1", true);
- final UUID query2Id = queryRepo.add("query2", false).getQueryId();
- queryRepo.add("query3", true);
+ queryRepo.add("query1", true, true);
+ final UUID query2Id = queryRepo.add("query2", false, true).getQueryId();
+ queryRepo.add("query3", true, false);
// Show that all three of the queries were added.
Set<StreamsQuery> queries = queryRepo.list();
diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java
index f6ceb75..e9f961a 100644
--- a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java
+++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java
@@ -68,9 +68,9 @@
@Test
public void shortParams() throws Exception {
// Add a few queries to Rya Streams.
- queryRepo.add("query1", true);
- queryRepo.add("query2", false);
- queryRepo.add("query3", true);
+ queryRepo.add("query1", true, true);
+ queryRepo.add("query2", false, true);
+ queryRepo.add("query3", true, false);
// Execute the List Queries command.
final String[] args = new String[] {
@@ -86,9 +86,9 @@
@Test
public void longParams() throws Exception {
// Add a few queries to Rya Streams.
- queryRepo.add("query1", true);
- queryRepo.add("query2", false);
- queryRepo.add("query3", true);
+ queryRepo.add("query1", true, true);
+ queryRepo.add("query2", false, true);
+ queryRepo.add("query3", true, false);
// Execute the List Queries command.
final String[] args = new String[] {
diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java
index 176b920..21a8e4c 100644
--- a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java
+++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java
@@ -116,7 +116,7 @@
@Test
public void runQuery() throws Exception {
// Register a query with the Query Repository.
- final StreamsQuery sQuery = queryRepo.add("SELECT * WHERE { ?person <urn:worksAt> ?business . }", true);
+ final StreamsQuery sQuery = queryRepo.add("SELECT * WHERE { ?person <urn:worksAt> ?business . }", true, false);
// Arguments that run the query we just registered with Rya Streams.
final String[] args = new String[] {
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java
index c9abb41..4459057 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java
@@ -88,7 +88,7 @@
final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog(), Scheduler.newFixedRateSchedule(0L, 5, TimeUnit.SECONDS) );
// Add the query to the query repository.
- final StreamsQuery sQuery = queries.add("SELECT * WHERE { ?person <urn:worksAt> ?business . }", true);
+ final StreamsQuery sQuery = queries.add("SELECT * WHERE { ?person <urn:worksAt> ?business . }", true, false);
final UUID queryId = sQuery.getQueryId();
final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId);
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLogIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLogIT.java
index c2b821f..0dcd079 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLogIT.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLogIT.java
@@ -79,7 +79,7 @@
public void testWrite() throws Exception {
final String sparql = "SOME QUERY HERE";
final UUID uuid = UUID.randomUUID();
- final QueryChange newChange = QueryChange.create(uuid, sparql, true);
+ final QueryChange newChange = QueryChange.create(uuid, sparql, true, false);
changeLog.write(newChange);
consumer.subscribe(Lists.newArrayList(topic));
@@ -93,7 +93,7 @@
@Test
public void readSingleWrite() throws Exception {
// Write a single change to the log.
- final QueryChange change = QueryChange.create(UUID.randomUUID(), "query", true);
+ final QueryChange change = QueryChange.create(UUID.randomUUID(), "query", true, false);
changeLog.write(change);
// Read that entry from the log.
@@ -198,7 +198,7 @@
assertFalse( changeLog2.readFromStart().hasNext() );
// Write a change to the first log.
- final QueryChange change = QueryChange.create(UUID.randomUUID(), "query", true);
+ final QueryChange change = QueryChange.create(UUID.randomUUID(), "query", true, false);
changeLog.write(change);
// Show it's in the first log.
@@ -214,7 +214,7 @@
for (int ii = 0; ii < 10; ii++) {
final String sparql = "SOME QUERY HERE_" + ii;
final UUID uuid = UUID.randomUUID();
- final QueryChange newChange = QueryChange.create(uuid, sparql, true);
+ final QueryChange newChange = QueryChange.create(uuid, sparql, true, false);
changeLog.write(newChange);
changes.add(newChange);
}
diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/LogEventWorkerTest.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/LogEventWorkerTest.java
index cb708ed..2cb543a 100644
--- a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/LogEventWorkerTest.java
+++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/LogEventWorkerTest.java
@@ -79,13 +79,13 @@
// Write a message that indicates a new query should be active.
final UUID firstQueryId = UUID.randomUUID();
- changeLog.write(QueryChange.create(firstQueryId, "select * where { ?a ?b ?c . }", true));
+ changeLog.write(QueryChange.create(firstQueryId, "select * where { ?a ?b ?c . }", true, false));
// Write a message that adds an active query, but then makes it inactive. Because both of these
// events are written to the log before the worker subscribes to the repository for updates, they
// must result in a single query stopped event.
final UUID secondQueryId = UUID.randomUUID();
- changeLog.write(QueryChange.create(secondQueryId, "select * where { ?d ?e ?f . }", true));
+ changeLog.write(QueryChange.create(secondQueryId, "select * where { ?d ?e ?f . }", true, false));
changeLog.write(QueryChange.update(secondQueryId, false));
// Start the worker that will be tested.
@@ -103,7 +103,7 @@
// Query 2, stopped.
Set<QueryEvent> expectedEvents = new HashSet<>();
expectedEvents.add(QueryEvent.executing("rya",
- new StreamsQuery(firstQueryId, "select * where { ?a ?b ?c . }", true)));
+ new StreamsQuery(firstQueryId, "select * where { ?a ?b ?c . }", true, false)));
expectedEvents.add(QueryEvent.stopped("rya", secondQueryId));
Set<QueryEvent> queryEvents = new HashSet<>();
@@ -146,7 +146,7 @@
// Write a message that indicates a new query should be active.
final UUID firstQueryId = UUID.randomUUID();
- changeLog.write(QueryChange.create(firstQueryId, "select * where { ?a ?b ?c . }", true));
+ changeLog.write(QueryChange.create(firstQueryId, "select * where { ?a ?b ?c . }", true, false));
// Start the worker that will be tested.
final Thread logEventWorker = new Thread(new LogEventWorker(logEventQueue,
@@ -165,7 +165,7 @@
// second message was effectively skipped as it would have add its work added twice otherwise.
final Set<QueryEvent> expectedEvents = new HashSet<>();
expectedEvents.add(QueryEvent.executing("rya",
- new StreamsQuery(firstQueryId, "select * where { ?a ?b ?c . }", true)));
+ new StreamsQuery(firstQueryId, "select * where { ?a ?b ?c . }", true, false)));
final Set<QueryEvent> queryEvents = new HashSet<>();
queryEvents.add( queryEventQueue.poll(500, TimeUnit.MILLISECONDS) );
diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryEventWorkGeneratorTest.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryEventWorkGeneratorTest.java
index 4495e19..ea6a37b 100644
--- a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryEventWorkGeneratorTest.java
+++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryEventWorkGeneratorTest.java
@@ -87,9 +87,9 @@
// A thread that will attempt to notify the generator with a created query.
final UUID queryId = UUID.randomUUID();
- final StreamsQuery query = new StreamsQuery(queryId, "query", true);
+ final StreamsQuery query = new StreamsQuery(queryId, "query", true, false);
final Thread notifyThread = new Thread(() -> {
- final QueryChange change = QueryChange.create(queryId, query.getSparql(), query.isActive());
+ final QueryChange change = QueryChange.create(queryId, query.getSparql(), query.isActive(), query.isInsert());
final ChangeLogEntry<QueryChange> entry = new ChangeLogEntry<>(0, change);
generator.notify(entry, Optional.of(query));
});
@@ -108,7 +108,7 @@
// Show work was added to the queue and the notifying thread died.
final QueryEvent event = queue.poll(500, TimeUnit.MILLISECONDS);
- final QueryEvent expected = QueryEvent.executing("rya", new StreamsQuery(queryId, query.getSparql(), query.isActive()));
+ final QueryEvent expected = QueryEvent.executing("rya", new StreamsQuery(queryId, query.getSparql(), query.isActive(), query.isInsert()));
assertEquals(expected, event);
} finally {
shutdownSignal.set(true);
@@ -132,9 +132,9 @@
// A thread that will attempt to notify the generator with a created query.
final UUID queryId = UUID.randomUUID();
- final StreamsQuery query = new StreamsQuery(queryId, "query", true);
+ final StreamsQuery query = new StreamsQuery(queryId, "query", true, false);
final Thread notifyThread = new Thread(() -> {
- final QueryChange change = QueryChange.create(queryId, query.getSparql(), query.isActive());
+ final QueryChange change = QueryChange.create(queryId, query.getSparql(), query.isActive(), query.isInsert());
final ChangeLogEntry<QueryChange> entry = new ChangeLogEntry<>(0, change);
generator.notify(entry, Optional.of(query));
});
@@ -145,7 +145,7 @@
try {
// Show work was added to the queue and the notifying thread died.
final QueryEvent event = queue.poll(500, TimeUnit.MILLISECONDS);
- final QueryEvent expected = QueryEvent.executing("rya", new StreamsQuery(queryId, query.getSparql(), query.isActive()));
+ final QueryEvent expected = QueryEvent.executing("rya", new StreamsQuery(queryId, query.getSparql(), query.isActive(), query.isInsert()));
assertEquals(expected, event);
} finally {
shutdownSignal.set(true);
@@ -205,7 +205,7 @@
// A thread that will attempt to notify the generator with an update query change.
final UUID queryId = UUID.randomUUID();
- final StreamsQuery query = new StreamsQuery(queryId, "query", true);
+ final StreamsQuery query = new StreamsQuery(queryId, "query", true, false);
final Thread notifyThread = new Thread(() -> {
final QueryChange change = QueryChange.update(queryId, true);
final ChangeLogEntry<QueryChange> entry = new ChangeLogEntry<>(0, change);
@@ -218,7 +218,7 @@
try {
// Show work was added to the queue and the notifying thread died.
final QueryEvent event = queue.poll(500, TimeUnit.MILLISECONDS);
- final QueryEvent expected = QueryEvent.executing("rya", new StreamsQuery(queryId, query.getSparql(), query.isActive()));
+ final QueryEvent expected = QueryEvent.executing("rya", new StreamsQuery(queryId, query.getSparql(), query.isActive(), query.isInsert()));
assertEquals(expected, event);
} finally {
shutdownSignal.set(true);
@@ -242,7 +242,7 @@
// A thread that will attempt to notify the generator with an update query change.
final UUID queryId = UUID.randomUUID();
- final StreamsQuery query = new StreamsQuery(queryId, "query", false);
+ final StreamsQuery query = new StreamsQuery(queryId, "query", false, false);
final Thread notifyThread = new Thread(() -> {
final QueryChange change = QueryChange.update(queryId, false);
final ChangeLogEntry<QueryChange> entry = new ChangeLogEntry<>(0, change);
diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryEventWorkerTest.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryEventWorkerTest.java
index 33c0719..95c7a54 100644
--- a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryEventWorkerTest.java
+++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryEventWorkerTest.java
@@ -68,7 +68,7 @@
// The message that indicates a query needs to be executed.
final String ryaInstance = "rya";
- final StreamsQuery query = new StreamsQuery(UUID.randomUUID(), "sparql", true);
+ final StreamsQuery query = new StreamsQuery(UUID.randomUUID(), "sparql", true, false);
final QueryEvent executingEvent = QueryEvent.executing(ryaInstance, query);
// Release a latch if the startQuery method on the queryExecutor is invoked with the correct values.
diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryManagerTest.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryManagerTest.java
index 04e70c0..f1c9e0f 100644
--- a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryManagerTest.java
+++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryManagerTest.java
@@ -50,7 +50,7 @@
//The new QueryChangeLog
final QueryChangeLog newChangeLog = new InMemoryQueryChangeLog();
final String ryaInstance = "ryaTestInstance";
- final StreamsQuery query = new StreamsQuery(UUID.randomUUID(), "some query", true);
+ final StreamsQuery query = new StreamsQuery(UUID.randomUUID(), "some query", true, false);
// when the query executor is told to start the test query on the test
// rya instance, count down on the countdown latch
@@ -69,7 +69,7 @@
//The listener created by the Query Manager
final SourceListener listener = (SourceListener) invocation.getArguments()[0];
listener.notifyCreate(ryaInstance, newChangeLog);
- newChangeLog.write(QueryChange.create(query.getQueryId(), query.getSparql(), query.isActive()));
+ newChangeLog.write(QueryChange.create(query.getQueryId(), query.getSparql(), query.isActive(), query.isInsert()));
return null;
}).when(source).subscribe(any(SourceListener.class));
@@ -91,7 +91,7 @@
public void testDeleteQuery() throws Exception {
//The new QueryChangeLog
final QueryChangeLog newChangeLog = new InMemoryQueryChangeLog();
- final StreamsQuery query = new StreamsQuery(UUID.randomUUID(), "some query", true);
+ final StreamsQuery query = new StreamsQuery(UUID.randomUUID(), "some query", true, false);
final String ryaInstance = "ryaTestInstance";
// when the query executor is told to start the test query on the test
@@ -121,7 +121,7 @@
final SourceListener listener = (SourceListener) invocation.getArguments()[0];
listener.notifyCreate(ryaInstance, newChangeLog);
Thread.sleep(1000);
- newChangeLog.write(QueryChange.create(query.getQueryId(), query.getSparql(), query.isActive()));
+ newChangeLog.write(QueryChange.create(query.getQueryId(), query.getSparql(), query.isActive(), query.isInsert()));
queryStarted.await(5, TimeUnit.SECONDS);
newChangeLog.write(QueryChange.delete(query.getQueryId()));
return null;
@@ -145,7 +145,7 @@
public void testUpdateQuery() throws Exception {
// The new QueryChangeLog
final QueryChangeLog newChangeLog = new InMemoryQueryChangeLog();
- final StreamsQuery query = new StreamsQuery(UUID.randomUUID(), "some query", true);
+ final StreamsQuery query = new StreamsQuery(UUID.randomUUID(), "some query", true, false);
final String ryaInstance = "ryaTestInstance";
// when the query executor is told to start the test query on the test
@@ -176,7 +176,7 @@
final SourceListener listener = (SourceListener) invocation.getArguments()[0];
listener.notifyCreate(ryaInstance, newChangeLog);
Thread.sleep(1000);
- newChangeLog.write(QueryChange.create(query.getQueryId(), query.getSparql(), query.isActive()));
+ newChangeLog.write(QueryChange.create(query.getQueryId(), query.getSparql(), query.isActive(), query.isInsert()));
queryStarted.await(5, TimeUnit.SECONDS);
newChangeLog.write(QueryChange.update(query.getQueryId(), false));
return null;
diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java
index 6358104..83f040d 100644
--- a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java
+++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java
@@ -86,7 +86,7 @@
public void runQuery() throws Exception {
// Test values.
final String ryaInstance = "rya";
- final StreamsQuery sQuery = new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?person <urn:worksAt> ?business . }", true);
+ final StreamsQuery sQuery = new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?person <urn:worksAt> ?business . }", true, false);
// Create the statements that will be loaded.
final ValueFactory vf = new ValueFactoryImpl();
diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorTest.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorTest.java
index c0f888e..efbcf4b 100644
--- a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorTest.java
+++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorTest.java
@@ -46,14 +46,14 @@
@Test(expected = IllegalStateException.class)
public void startQuery_serviceNotStarted() throws Exception {
final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), mock(KafkaStreamsFactory.class));
- executor.startQuery("rya", new StreamsQuery(UUID.randomUUID(), "query", true));
+ executor.startQuery("rya", new StreamsQuery(UUID.randomUUID(), "query", true, false));
}
@Test
public void startQuery() throws Exception {
// Test values.
final String ryaInstance = "rya";
- final StreamsQuery query = new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true);
+ final StreamsQuery query = new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true, false);
// Mock the streams factory so that we can tell if the start function is invoked by the executor.
final KafkaStreamsFactory jobFactory = mock(KafkaStreamsFactory.class);
@@ -97,7 +97,7 @@
public void stopQuery() throws Exception {
// Test values.
final String ryaInstance = "rya";
- final StreamsQuery query = new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true);
+ final StreamsQuery query = new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true, false);
// Mock the streams factory so that we can tell if the stop function is invoked by the executor.
final KafkaStreamsFactory jobFactory = mock(KafkaStreamsFactory.class);
@@ -131,8 +131,8 @@
public void stopAll_noneForThatRyaInstance() throws Exception {
// Test values.
final String ryaInstance = "rya";
- final StreamsQuery query1= new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true);
- final StreamsQuery query2= new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true);
+ final StreamsQuery query1= new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true, false);
+ final StreamsQuery query2= new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true, false);
// Mock the streams factory so that we can tell if the stop function is invoked by the executor.
final KafkaStreamsFactory jobFactory = mock(KafkaStreamsFactory.class);
@@ -169,9 +169,9 @@
public void stopAll() throws Exception {
// Test values.
final String ryaInstance1 = "rya1";
- final StreamsQuery query1= new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true);
+ final StreamsQuery query1= new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true, false);
final String ryaInstance2 = "rya2";
- final StreamsQuery query2= new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true);
+ final StreamsQuery query2= new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true, false);
// Mock the streams factory so that we can tell if the stop function is invoked by the executor.
final KafkaStreamsFactory jobFactory = mock(KafkaStreamsFactory.class);
@@ -230,9 +230,9 @@
public void getRunningQueryIds_noneStopped() throws Exception {
// Test values.
final String ryaInstance = "rya";
- final StreamsQuery query1 = new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true);
- final StreamsQuery query2 = new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true);
- final StreamsQuery query3 = new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true);
+ final StreamsQuery query1 = new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true, false);
+ final StreamsQuery query2 = new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true, false);
+ final StreamsQuery query3 = new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true, false);
// Mock the streams factory so that we can figure out what is started.
final KafkaStreamsFactory jobFactory = mock(KafkaStreamsFactory.class);
@@ -265,9 +265,9 @@
public void getRunningQueryIds_stoppedNoLongerListed() throws Exception {
// Test values.
final String ryaInstance = "rya";
- final StreamsQuery query1 = new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true);
- final StreamsQuery query2 = new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true);
- final StreamsQuery query3 = new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true);
+ final StreamsQuery query1 = new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true, false);
+ final StreamsQuery query2 = new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true, false);
+ final StreamsQuery query3 = new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true, false);
// Mock the streams factory so that we can figure out what is started.
final KafkaStreamsFactory jobFactory = mock(KafkaStreamsFactory.class);
diff --git a/extras/shell/src/main/java/org/apache/rya/shell/RyaStreamsCommands.java b/extras/shell/src/main/java/org/apache/rya/shell/RyaStreamsCommands.java
index 5f7df84..fede1a9 100644
--- a/extras/shell/src/main/java/org/apache/rya/shell/RyaStreamsCommands.java
+++ b/extras/shell/src/main/java/org/apache/rya/shell/RyaStreamsCommands.java
@@ -28,13 +28,16 @@
import org.apache.rya.api.client.RyaClientException;
import org.apache.rya.api.instance.RyaDetails;
import org.apache.rya.api.instance.RyaDetails.RyaStreamsDetails;
+import org.apache.rya.api.utils.QueryInvestigator;
import org.apache.rya.shell.SharedShellState.ConnectionState;
+import org.apache.rya.shell.util.ConsolePrinter;
import org.apache.rya.shell.util.SparqlPrompt;
import org.apache.rya.shell.util.StreamsQueryFormatter;
import org.apache.rya.streams.api.RyaStreamsClient;
import org.apache.rya.streams.api.entity.StreamsQuery;
import org.apache.rya.streams.api.exception.RyaStreamsException;
import org.apache.rya.streams.kafka.KafkaRyaStreamsClientFactory;
+import org.openrdf.query.MalformedQueryException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.shell.core.CommandMarker;
import org.springframework.shell.core.annotation.CliAvailabilityIndicator;
@@ -61,19 +64,23 @@
private final SharedShellState state;
private final SparqlPrompt sparqlPrompt;
+ private final ConsolePrinter consolePrinter;
/**
* Constructs an instance of {@link RyaStreamsCommands}.
*
* @param state - Holds shared state between all of the command classes. (not null)
* @param sparqlPrompt - Prompts a user for a SPARQL query. (not null)
+ * @param consolePrinter - Prints messages to the console. (not null)
*/
@Autowired
public RyaStreamsCommands(
final SharedShellState state,
- final SparqlPrompt sparqlPrompt) {
+ final SparqlPrompt sparqlPrompt,
+ final ConsolePrinter consolePrinter) {
this.state = requireNonNull(state);
this.sparqlPrompt = requireNonNull(sparqlPrompt);
+ this.consolePrinter = requireNonNull(consolePrinter);
}
/**
@@ -172,7 +179,10 @@
public String addQuery(
@CliOption(key = {"inactive"}, mandatory = false, unspecifiedDefaultValue = "false", specifiedDefaultValue = "true",
help = "Setting this flag will add the query, but not run it. (default: false)")
- final boolean inactive) {
+ final boolean inactive,
+ @CliOption(key = {"insert"}, mandatory = false, unspecifiedDefaultValue = "false", specifiedDefaultValue = "true",
+ help = "Setting this flag will insert the query's results back into Rya. (default: false)")
+ final boolean isInsert) {
final RyaStreamsClient streamsClient = state.getShellState().getRyaStreamsCommands().get();
// Prompt the user for the SPARQL that defines the query.
@@ -184,10 +194,32 @@
return "";
}
- final StreamsQuery streamsQuery = streamsClient.getAddQuery().addQuery(sparql.get(), !inactive);
+ final boolean isConstructQuery = QueryInvestigator.isConstruct(sparql.get());
+ final boolean isInsertQuery = QueryInvestigator.isInsertWhere(sparql.get());
+
+ // If the user wants to insert a CONSTRUCT into Rya, print a warning.
+ if(isInsert && isConstructQuery) {
+ consolePrinter.println("WARNING: CONSTRUCT is part of the SPARQL Query API, so they do not normally");
+ consolePrinter.println("get written back to the triple store. Consider using an INSERT, which is");
+ consolePrinter.println("part of the SPARQL Update API, in the future.");
+ }
+
+ // If the user wants to use an INSERT query, but not insert it back into Rya, suggest using a construct.
+ if(!isInsert && isInsertQuery) {
+ consolePrinter.println("WARNING: INSERT is part of the SPARQL Update API, so they normally get written");
+ consolePrinter.println("back to the triple store. Consider using a CONSTRUCT, which is part of the");
+ consolePrinter.println("SPARQL Query API, in the future.");
+ }
+
+ // If the user wants to insert the query back into Rya, make sure it is a legal query to do that.
+ if(isInsert && !(isConstructQuery || isInsertQuery)) {
+ throw new RuntimeException("Only CONSTRUCT queries and INSERT updates may be inserted back to the triple store.");
+ }
+
+ final StreamsQuery streamsQuery = streamsClient.getAddQuery().addQuery(sparql.get(), !inactive, isInsert);
return "The added query's ID is " + streamsQuery.getQueryId();
- } catch (final IOException | RyaStreamsException e) {
+ } catch (final MalformedQueryException | IOException | RyaStreamsException e) {
throw new RuntimeException("Unable to add the SPARQL query to the Rya Streams subsystem.", e);
}
}
diff --git a/extras/shell/src/main/java/org/apache/rya/shell/util/StreamsQueryFormatter.java b/extras/shell/src/main/java/org/apache/rya/shell/util/StreamsQueryFormatter.java
index 6c06caf..babeec8 100644
--- a/extras/shell/src/main/java/org/apache/rya/shell/util/StreamsQueryFormatter.java
+++ b/extras/shell/src/main/java/org/apache/rya/shell/util/StreamsQueryFormatter.java
@@ -63,6 +63,7 @@
final StringBuilder builder = new StringBuilder();
builder.append(" Query ID: ").append( query.getQueryId() ) .append("\n");
builder.append("Is Active: ").append( query.isActive() ).append("\n");
+ builder.append("Is Insert: ").append( query.isInsert() ).append("\n");
builder.append(" SPARQL: ").append( lines[0] ).append("\n");
for(int i = 1; i < lines.length; i++) {
diff --git a/extras/shell/src/test/java/org/apache/rya/shell/RyaStreamsCommandsTest.java b/extras/shell/src/test/java/org/apache/rya/shell/RyaStreamsCommandsTest.java
index 9f5a794..2759c33 100644
--- a/extras/shell/src/test/java/org/apache/rya/shell/RyaStreamsCommandsTest.java
+++ b/extras/shell/src/test/java/org/apache/rya/shell/RyaStreamsCommandsTest.java
@@ -36,6 +36,7 @@
import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
import org.apache.rya.api.instance.RyaDetails;
import org.apache.rya.api.instance.RyaDetails.RyaStreamsDetails;
+import org.apache.rya.shell.util.ConsolePrinter;
import org.apache.rya.shell.util.SparqlPrompt;
import org.apache.rya.streams.api.RyaStreamsClient;
import org.apache.rya.streams.api.entity.StreamsQuery;
@@ -72,7 +73,7 @@
try {
// Execute the command.
- final RyaStreamsCommands commands = new RyaStreamsCommands(state, mock(SparqlPrompt.class));
+ final RyaStreamsCommands commands = new RyaStreamsCommands(state, mock(SparqlPrompt.class), mock(ConsolePrinter.class));
final String message = commands.configureRyaStreams("localhost", 6);
// Verify the request was forwarded to the mocked interactor.
@@ -107,7 +108,7 @@
state.connectedToInstance("unitTest");
// Execute the command.
- final RyaStreamsCommands commands = new RyaStreamsCommands(state, mock(SparqlPrompt.class));
+ final RyaStreamsCommands commands = new RyaStreamsCommands(state, mock(SparqlPrompt.class), mock(ConsolePrinter.class));
final String message = commands.printRyaStreamsDetails();
final String expected = "This instance does not have any Rya Details, so it is unable to be connected to the Rya Streams subsystem.";
assertEquals(expected, message);
@@ -131,7 +132,7 @@
state.connectedToInstance("unitTest");
// Execute the command.
- final RyaStreamsCommands commands = new RyaStreamsCommands(state, mock(SparqlPrompt.class));
+ final RyaStreamsCommands commands = new RyaStreamsCommands(state, mock(SparqlPrompt.class), mock(ConsolePrinter.class));
final String message = commands.printRyaStreamsDetails();
final String expected = "This instance of Rya has not been configured to use a Rya Streams subsystem.";
assertEquals(expected, message);
@@ -155,7 +156,7 @@
state.connectedToInstance("unitTest");
// Execute the command.
- final RyaStreamsCommands commands = new RyaStreamsCommands(state, mock(SparqlPrompt.class));
+ final RyaStreamsCommands commands = new RyaStreamsCommands(state, mock(SparqlPrompt.class), mock(ConsolePrinter.class));
final String message = commands.printRyaStreamsDetails();
final String expected = "Kafka Hostname: localhost, Kafka Port: 6";
assertEquals(expected, message);
@@ -179,26 +180,28 @@
state.connectedToRyaStreams(mockClient);
// Execute the command.
- final RyaStreamsCommands commands = new RyaStreamsCommands(state, prompt);
- final String message = commands.addQuery(false);
+ final RyaStreamsCommands commands = new RyaStreamsCommands(state, prompt, mock(ConsolePrinter.class));
+ final String message = commands.addQuery(false, false);
// Verify a message is printed to the user.
assertEquals("", message);
}
@Test
- public void addQuery() throws Exception {
+ public void addQuery_doNotInsertQuery() throws Exception {
// Mock the object that performs the rya streams operation.
final RyaStreamsClient mockClient = mock(RyaStreamsClient.class);
final AddQuery addQuery = mock(AddQuery.class);
when(mockClient.getAddQuery()).thenReturn(addQuery);
- final StreamsQuery addedQuery = new StreamsQuery(UUID.randomUUID(), "query", true);
- when(addQuery.addQuery(eq("query"), eq(true))).thenReturn(addedQuery);
+ final String sparql = "SELECT * WHERE { ?a ?b ?c }";
+
+ final StreamsQuery addedQuery = new StreamsQuery(UUID.randomUUID(), sparql, true, false);
+ when(addQuery.addQuery(eq(sparql), eq(true), eq(false))).thenReturn(addedQuery);
// Mock a SPARQL prompt that a user entered a query through.
final SparqlPrompt prompt = mock(SparqlPrompt.class);
- when(prompt.getSparql()).thenReturn(Optional.of("query"));
+ when(prompt.getSparql()).thenReturn(Optional.of(sparql));
// Mock a shell state and connect it to a Rya instance.
final SharedShellState state = new SharedShellState();
@@ -207,11 +210,11 @@
state.connectedToRyaStreams(mockClient);
// Execute the command.
- final RyaStreamsCommands commands = new RyaStreamsCommands(state, prompt);
- final String message = commands.addQuery(false);
+ final RyaStreamsCommands commands = new RyaStreamsCommands(state, prompt, mock(ConsolePrinter.class));
+ final String message = commands.addQuery(false, false);
// Verify the interactor was invoked with the provided input.
- verify(addQuery).addQuery("query", true);
+ verify(addQuery).addQuery(sparql, true, false);
// Verify a message is printed to the user.
final String expected = "The added query's ID is " + addedQuery.getQueryId();
@@ -219,6 +222,124 @@
}
@Test
+ public void addQuery_insertConstructQuery() throws Exception {
+ // Mock the object that performs the rya streams operation.
+ final RyaStreamsClient mockClient = mock(RyaStreamsClient.class);
+ final AddQuery addQuery = mock(AddQuery.class);
+ when(mockClient.getAddQuery()).thenReturn(addQuery);
+
+ final String sparql =
+ "PREFIX vCard: <http://www.w3.org/2001/vcard-rdf/3.0#> " +
+ "PREFIX foaf: <http://xmlns.com/foaf/0.1/> " +
+ "CONSTRUCT { " +
+ "?X vCard:FN ?name . " +
+ "?X vCard:URL ?url . " +
+ "?X vCard:TITLE ?title . " +
+ "} " +
+ "FROM <http://www.w3.org/People/Berners-Lee/card> " +
+ "WHERE { " +
+ "OPTIONAL { ?X foaf:name ?name . FILTER isLiteral(?name) . } " +
+ "OPTIONAL { ?X foaf:homepage ?url . FILTER isURI(?url) . } " +
+ "OPTIONAL { ?X foaf:title ?title . FILTER isLiteral(?title) . } " +
+ "}";
+
+ final StreamsQuery addedQuery = new StreamsQuery(UUID.randomUUID(), sparql, true, true);
+ when(addQuery.addQuery(eq(sparql), eq(false), eq(true))).thenReturn(addedQuery);
+
+ // Mock a SPARQL prompt that a user entered a query through.
+ final SparqlPrompt prompt = mock(SparqlPrompt.class);
+ when(prompt.getSparql()).thenReturn(Optional.of(sparql));
+
+ // Mock a shell state and connect it to a Rya instance.
+ final SharedShellState state = new SharedShellState();
+ state.connectedToAccumulo(mock(AccumuloConnectionDetails.class), mock(RyaClient.class));
+ state.connectedToInstance("unitTest");
+ state.connectedToRyaStreams(mockClient);
+
+ // Execute the command.
+ final RyaStreamsCommands commands = new RyaStreamsCommands(state, prompt, mock(ConsolePrinter.class));
+ final String message = commands.addQuery(true, true);
+
+ // Verify the interactor was invoked with the provided input.
+ verify(addQuery).addQuery(sparql, false, true);
+
+ // Verify a message is printed to the user.
+ final String expected = "The added query's ID is " + addedQuery.getQueryId();
+ assertEquals(expected, message);
+ }
+
+ @Test
+ public void addQuery_doNotInsertInsertUpdate() throws Exception {
+ // Mock the object that performs the rya streams operation.
+ final RyaStreamsClient mockClient = mock(RyaStreamsClient.class);
+ final AddQuery addQuery = mock(AddQuery.class);
+ when(mockClient.getAddQuery()).thenReturn(addQuery);
+
+ final String sparql =
+ "PREFIX Sensor: <http://example.com/Equipment.owl#> " +
+ "INSERT { " +
+ "?subject Sensor:test2 ?newValue " +
+ "} WHERE {" +
+ "values (?oldValue ?newValue) {" +
+ "('testValue1' 'newValue1')" +
+ "('testValue2' 'newValue2')" +
+ "}" +
+ "?subject Sensor:test1 ?oldValue" +
+ "}";
+
+ final StreamsQuery addedQuery = new StreamsQuery(UUID.randomUUID(), sparql, true, false);
+ when(addQuery.addQuery(eq(sparql), eq(false), eq(false))).thenReturn(addedQuery);
+
+ // Mock a SPARQL prompt that a user entered a query through.
+ final SparqlPrompt prompt = mock(SparqlPrompt.class);
+ when(prompt.getSparql()).thenReturn(Optional.of(sparql));
+
+ // Mock a shell state and connect it to a Rya instance.
+ final SharedShellState state = new SharedShellState();
+ state.connectedToAccumulo(mock(AccumuloConnectionDetails.class), mock(RyaClient.class));
+ state.connectedToInstance("unitTest");
+ state.connectedToRyaStreams(mockClient);
+
+ // Execute the command.
+ final RyaStreamsCommands commands = new RyaStreamsCommands(state, prompt, mock(ConsolePrinter.class));
+ final String message = commands.addQuery(true, false);
+
+ // Verify the interactor was invoked with the provided input.
+ verify(addQuery).addQuery(sparql, false, false);
+
+ // Verify a message is printed to the user.
+ final String expected = "The added query's ID is " + addedQuery.getQueryId();
+ assertEquals(expected, message);
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void addQuery_insertQueryNotCorrectType() throws Exception {
+ // Mock the object that performs the rya streams operation.
+ final RyaStreamsClient mockClient = mock(RyaStreamsClient.class);
+ final AddQuery addQuery = mock(AddQuery.class);
+ when(mockClient.getAddQuery()).thenReturn(addQuery);
+
+ final String sparql = "SELECT * WHERE { ?a ?b ?c }";
+
+ final StreamsQuery addedQuery = new StreamsQuery(UUID.randomUUID(), sparql, true, true);
+ when(addQuery.addQuery(eq(sparql), eq(false), eq(true))).thenReturn(addedQuery);
+
+ // Mock a SPARQL prompt that a user entered a query through.
+ final SparqlPrompt prompt = mock(SparqlPrompt.class);
+ when(prompt.getSparql()).thenReturn(Optional.of(sparql));
+
+ // Mock a shell state and connect it to a Rya instance.
+ final SharedShellState state = new SharedShellState();
+ state.connectedToAccumulo(mock(AccumuloConnectionDetails.class), mock(RyaClient.class));
+ state.connectedToInstance("unitTest");
+ state.connectedToRyaStreams(mockClient);
+
+ // Execute the command.
+ final RyaStreamsCommands commands = new RyaStreamsCommands(state, prompt, mock(ConsolePrinter.class));
+ commands.addQuery(true, true);
+ }
+
+ @Test
public void deleteQuery() throws Exception {
// Mock the object that performs the rya streams operation.
final RyaStreamsClient mockClient = mock(RyaStreamsClient.class);
@@ -232,7 +353,7 @@
state.connectedToRyaStreams(mockClient);
// Execute the command.
- final RyaStreamsCommands commands = new RyaStreamsCommands(state, mock(SparqlPrompt.class));
+ final RyaStreamsCommands commands = new RyaStreamsCommands(state, mock(SparqlPrompt.class), mock(ConsolePrinter.class));
final UUID queryId = UUID.randomUUID();
final String message = commands.deleteQuery(queryId.toString());
@@ -261,10 +382,10 @@
// Report the query as not running.
final UUID queryId = UUID.randomUUID();
- when(getQuery.getQuery(eq(queryId))).thenReturn(java.util.Optional.of(new StreamsQuery(queryId, "sparql", false)));
+ when(getQuery.getQuery(eq(queryId))).thenReturn(java.util.Optional.of(new StreamsQuery(queryId, "sparql", false, false)));
// Execute the command.
- final RyaStreamsCommands commands = new RyaStreamsCommands(state, mock(SparqlPrompt.class));
+ final RyaStreamsCommands commands = new RyaStreamsCommands(state, mock(SparqlPrompt.class), mock(ConsolePrinter.class));
final String message = commands.startQuery(queryId.toString());
// Verify the interactor was invoked with the provided parameters.
@@ -292,10 +413,10 @@
// Report the query as running.
final UUID queryId = UUID.randomUUID();
- when(getQuery.getQuery(eq(queryId))).thenReturn(java.util.Optional.of(new StreamsQuery(queryId, "sparql", true)));
+ when(getQuery.getQuery(eq(queryId))).thenReturn(java.util.Optional.of(new StreamsQuery(queryId, "sparql", true, false)));
// Execute the command.
- final RyaStreamsCommands commands = new RyaStreamsCommands(state, mock(SparqlPrompt.class));
+ final RyaStreamsCommands commands = new RyaStreamsCommands(state, mock(SparqlPrompt.class), mock(ConsolePrinter.class));
final String message = commands.startQuery(queryId.toString());
// Verify the interactor was not invoked.
@@ -323,10 +444,10 @@
// Report the query as running.
final UUID queryId = UUID.randomUUID();
- when(getQuery.getQuery(eq(queryId))).thenReturn(java.util.Optional.of(new StreamsQuery(queryId, "sparql", true)));
+ when(getQuery.getQuery(eq(queryId))).thenReturn(java.util.Optional.of(new StreamsQuery(queryId, "sparql", true, false)));
// Execute the command.
- final RyaStreamsCommands commands = new RyaStreamsCommands(state, mock(SparqlPrompt.class));
+ final RyaStreamsCommands commands = new RyaStreamsCommands(state, mock(SparqlPrompt.class), mock(ConsolePrinter.class));
final String message = commands.stopQuery(queryId.toString());
// Verify the interactor was invoked with the provided parameters.
@@ -354,10 +475,10 @@
// Report the query as not running.
final UUID queryId = UUID.randomUUID();
- when(getQuery.getQuery(eq(queryId))).thenReturn(java.util.Optional.of(new StreamsQuery(queryId, "sparql", false)));
+ when(getQuery.getQuery(eq(queryId))).thenReturn(java.util.Optional.of(new StreamsQuery(queryId, "sparql", false, false)));
// Execute the command.
- final RyaStreamsCommands commands = new RyaStreamsCommands(state, mock(SparqlPrompt.class));
+ final RyaStreamsCommands commands = new RyaStreamsCommands(state, mock(SparqlPrompt.class), mock(ConsolePrinter.class));
final String message = commands.stopQuery(queryId.toString());
// Verify the interactor was not invoked with the provided parameters.
@@ -379,15 +500,15 @@
new StreamsQuery(
UUID.fromString("33333333-3333-3333-3333-333333333333"),
"SELECT * WHERE { ?person <urn:worksAt> ?business . }",
- true),
+ true, false),
new StreamsQuery(
UUID.fromString("11111111-1111-1111-1111-111111111111"),
"SELECT * WHERE { ?a ?b ?c . }",
- true),
+ true, false),
new StreamsQuery(
UUID.fromString("22222222-2222-2222-2222-222222222222"),
"SELECT * WHERE { ?d ?e ?f . }",
- false));
+ false, false));
when(listQueries.all()).thenReturn(queries);
// Mock a shell state and connect it to a Rya instance.
@@ -397,7 +518,7 @@
state.connectedToRyaStreams(mockClient);
// Execute the command.
- final RyaStreamsCommands commands = new RyaStreamsCommands(state, mock(SparqlPrompt.class));
+ final RyaStreamsCommands commands = new RyaStreamsCommands(state, mock(SparqlPrompt.class), mock(ConsolePrinter.class));
final String message = commands.listQueries();
// Verify the correct report is returned.
@@ -405,6 +526,7 @@
"-----------------------------------------------\n" +
" Query ID: 11111111-1111-1111-1111-111111111111\n" +
"Is Active: true\n" +
+ "Is Insert: false\n" +
" SPARQL: select ?a ?b ?c\n" +
" where {\n" +
" ?a ?b ?c.\n" +
@@ -412,6 +534,7 @@
"-----------------------------------------------\n" +
" Query ID: 22222222-2222-2222-2222-222222222222\n" +
"Is Active: false\n" +
+ "Is Insert: false\n" +
" SPARQL: select ?d ?e ?f\n" +
" where {\n" +
" ?d ?e ?f.\n" +
@@ -419,6 +542,7 @@
"-----------------------------------------------\n" +
" Query ID: 33333333-3333-3333-3333-333333333333\n" +
"Is Active: true\n" +
+ "Is Insert: false\n" +
" SPARQL: select ?person ?business\n" +
" where {\n" +
" ?person <urn:worksAt> ?business.\n" +
@@ -435,7 +559,7 @@
when(mockClient.getGetQuery()).thenReturn(getQuery);
final UUID queryId = UUID.fromString("da55cea5-c21c-46a5-ab79-5433eef4efaa");
- final StreamsQuery query = new StreamsQuery(queryId, "SELECT * WHERE { ?a ?b ?c . }", true);
+ final StreamsQuery query = new StreamsQuery(queryId, "SELECT * WHERE { ?a ?b ?c . }", true, false);
when(getQuery.getQuery(queryId)).thenReturn(java.util.Optional.of(query));
// Mock a shell state and connect it to a Rya instance.
@@ -445,13 +569,14 @@
state.connectedToRyaStreams(mockClient);
// Execute the command.
- final RyaStreamsCommands commands = new RyaStreamsCommands(state, mock(SparqlPrompt.class));
+ final RyaStreamsCommands commands = new RyaStreamsCommands(state, mock(SparqlPrompt.class), mock(ConsolePrinter.class));
final String message = commands.printQueryDetails(queryId.toString());
// Verify the correct report is returned.
final String expected =
" Query ID: da55cea5-c21c-46a5-ab79-5433eef4efaa\n" +
"Is Active: true\n" +
+ "Is Insert: false\n" +
" SPARQL: select ?a ?b ?c\n" +
" where {\n" +
" ?a ?b ?c.\n" +
diff --git a/extras/shell/src/test/java/org/apache/rya/shell/util/StreamsQueryFormatterTest.java b/extras/shell/src/test/java/org/apache/rya/shell/util/StreamsQueryFormatterTest.java
index 8e5d251..594a002 100644
--- a/extras/shell/src/test/java/org/apache/rya/shell/util/StreamsQueryFormatterTest.java
+++ b/extras/shell/src/test/java/org/apache/rya/shell/util/StreamsQueryFormatterTest.java
@@ -39,13 +39,14 @@
final StreamsQuery query = new StreamsQuery(
UUID.fromString("da55cea5-c21c-46a5-ab79-5433eef4efaa"),
"SELECT * WHERE { ?a ?b ?c . }",
- true);
+ true, false);
final String formatted = StreamsQueryFormatter.format(query);
// Ensure it has the expected format.
final String expected =
" Query ID: da55cea5-c21c-46a5-ab79-5433eef4efaa\n" +
"Is Active: true\n" +
+ "Is Insert: false\n" +
" SPARQL: select ?a ?b ?c\n" +
" where {\n" +
" ?a ?b ?c.\n" +
@@ -61,15 +62,15 @@
new StreamsQuery(
UUID.fromString("33333333-3333-3333-3333-333333333333"),
"SELECT * WHERE { ?person <urn:worksAt> ?business . }",
- true),
+ true, true),
new StreamsQuery(
UUID.fromString("11111111-1111-1111-1111-111111111111"),
"SELECT * WHERE { ?a ?b ?c . }",
- true),
+ true, false),
new StreamsQuery(
UUID.fromString("22222222-2222-2222-2222-222222222222"),
"SELECT * WHERE { ?d ?e ?f . }",
- false));
+ false, true));
final String formatted = StreamsQueryFormatter.format(queries);
@@ -78,6 +79,7 @@
"-----------------------------------------------\n" +
" Query ID: 11111111-1111-1111-1111-111111111111\n" +
"Is Active: true\n" +
+ "Is Insert: false\n" +
" SPARQL: select ?a ?b ?c\n" +
" where {\n" +
" ?a ?b ?c.\n" +
@@ -85,6 +87,7 @@
"-----------------------------------------------\n" +
" Query ID: 22222222-2222-2222-2222-222222222222\n" +
"Is Active: false\n" +
+ "Is Insert: true\n" +
" SPARQL: select ?d ?e ?f\n" +
" where {\n" +
" ?d ?e ?f.\n" +
@@ -92,6 +95,7 @@
"-----------------------------------------------\n" +
" Query ID: 33333333-3333-3333-3333-333333333333\n" +
"Is Active: true\n" +
+ "Is Insert: true\n" +
" SPARQL: select ?person ?business\n" +
" where {\n" +
" ?person <urn:worksAt> ?business.\n" +