RYA-440 Added commands to Rya Shell used to interact with Rya Streams. Closes #267.
diff --git a/common/rya.api/src/main/java/org/apache/rya/api/client/RyaClient.java b/common/rya.api/src/main/java/org/apache/rya/api/client/RyaClient.java
index 92b18a1..c122f43 100644
--- a/common/rya.api/src/main/java/org/apache/rya/api/client/RyaClient.java
+++ b/common/rya.api/src/main/java/org/apache/rya/api/client/RyaClient.java
@@ -45,6 +45,7 @@
private final ListInstances listInstances;
private final Optional<AddUser> addUser;
private final Optional<RemoveUser> removeUser;
+ private final SetRyaStreamsConfiguration setRyaStreamsConfig;
private final Uninstall uninstall;
private final LoadStatements loadStatements;
private final LoadStatementsFile loadStatementsFile;
@@ -65,6 +66,7 @@
final ListInstances listInstances,
final Optional<AddUser> addUser,
final Optional<RemoveUser> removeUser,
+ final SetRyaStreamsConfiguration setRyaStreamsConfig,
final Uninstall uninstall,
final LoadStatements loadStatements,
final LoadStatementsFile loadStatementsFile,
@@ -81,6 +83,7 @@
this.listInstances = requireNonNull(listInstances);
this.addUser = requireNonNull(addUser);
this.removeUser = requireNonNull(removeUser);
+ this.setRyaStreamsConfig = requireNonNull(setRyaStreamsConfig);
this.uninstall = requireNonNull(uninstall);
this.loadStatements = requireNonNull(loadStatements);
this.loadStatementsFile = requireNonNull(loadStatementsFile);
@@ -176,6 +179,13 @@
}
/**
+ * @return An instance of {@link SetRyaStreamsConfiguration} that is connected to a Rya storage.
+ */
+ public SetRyaStreamsConfiguration getSetRyaStreamsConfiguration() {
+ return setRyaStreamsConfig;
+ }
+
+ /**
* @return An instance of {@link Uninstall} that is connected to a Rya storage.
*/
public Uninstall getUninstall() {
diff --git a/common/rya.api/src/main/java/org/apache/rya/api/client/SetRyaStreamsConfiguration.java b/common/rya.api/src/main/java/org/apache/rya/api/client/SetRyaStreamsConfiguration.java
new file mode 100644
index 0000000..5e75f06
--- /dev/null
+++ b/common/rya.api/src/main/java/org/apache/rya/api/client/SetRyaStreamsConfiguration.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.client;
+
+import org.apache.rya.api.instance.RyaDetails.RyaStreamsDetails;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Update which Rya Streams subsystem a Rya instance is connected to.
+ */
+@DefaultAnnotation(NonNull.class)
+public interface SetRyaStreamsConfiguration {
+
+ /**
+ * Update which Rya Streams subsystem a Rya instance is connected to.
+ *
+ * @param instanceName - Indicates which Rya instance will have a Rya Streams subsystem assigned to it. (not null)
+ * @param streamsDetails - Indicates which Rya Streams subsystem the instance will use. (not null)
+ * @throws InstanceDoesNotExistException No instance of Rya exists for the provided name.
+ * @throws RyaClientException Something caused the command to fail.
+ */
+ public void setRyaStreamsConfiguration(String ryaInstance, RyaStreamsDetails streamsDetails) throws InstanceDoesNotExistException, RyaClientException;
+}
\ No newline at end of file
diff --git a/common/rya.api/src/main/java/org/apache/rya/api/instance/RyaDetails.java b/common/rya.api/src/main/java/org/apache/rya/api/instance/RyaDetails.java
index 9d2c1e5..bda7390 100644
--- a/common/rya.api/src/main/java/org/apache/rya/api/instance/RyaDetails.java
+++ b/common/rya.api/src/main/java/org/apache/rya/api/instance/RyaDetails.java
@@ -29,25 +29,22 @@
import java.util.Map.Entry;
import java.util.Objects;
-import edu.umd.cs.findbugs.annotations.Nullable;
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
-import net.jcip.annotations.Immutable;
-
-import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails;
-import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails;
-
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+import net.jcip.annotations.Immutable;
+
/**
* Details about how a Rya instance's state.
*/
@Immutable
@DefaultAnnotation(NonNull.class)
public class RyaDetails implements Serializable {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
// General metadata about the instance.
private final String instanceName;
@@ -68,6 +65,9 @@
private final ProspectorDetails prospectorDetails;
private final JoinSelectivityDetails joinSelectivityDetails;
+ // Rya Streams Details.
+ private final Optional<RyaStreamsDetails> ryaStreamsDetails;
+
/**
* Private to prevent initialization through the constructor. To build
* instances of this class, use the {@link Builder}.
@@ -82,7 +82,8 @@
final TemporalIndexDetails temporalDetails,
final FreeTextIndexDetails freeTextDetails,
final ProspectorDetails prospectorDetails,
- final JoinSelectivityDetails joinSelectivityDetails) {
+ final JoinSelectivityDetails joinSelectivityDetails,
+ final Optional<RyaStreamsDetails> ryaStreamsDetails) {
this.instanceName = requireNonNull(instanceName);
this.version = requireNonNull(version);
this.users = requireNonNull(users);
@@ -93,6 +94,7 @@
this.freeTextDetails = requireNonNull(freeTextDetails);
this.prospectorDetails = requireNonNull(prospectorDetails);
this.joinSelectivityDetails = requireNonNull(joinSelectivityDetails);
+ this.ryaStreamsDetails = requireNonNull(ryaStreamsDetails);
}
/**
@@ -168,6 +170,13 @@
return joinSelectivityDetails;
}
+ /**
+ * @return Information about the instance's Rya Streams integration, if it was set.
+ */
+ public Optional<RyaStreamsDetails> getRyaStreamsDetails() {
+ return ryaStreamsDetails;
+ }
+
@Override
public int hashCode() {
return Objects.hash(
@@ -179,7 +188,8 @@
temporalDetails,
freeTextDetails,
prospectorDetails,
- joinSelectivityDetails);
+ joinSelectivityDetails,
+ ryaStreamsDetails);
}
@Override
@@ -197,7 +207,8 @@
Objects.equals(temporalDetails, details.temporalDetails) &&
Objects.equals(freeTextDetails, details.freeTextDetails) &&
Objects.equals(prospectorDetails, details.prospectorDetails) &&
- Objects.equals(joinSelectivityDetails, details.joinSelectivityDetails);
+ Objects.equals(joinSelectivityDetails, details.joinSelectivityDetails) &&
+ Objects.equals(ryaStreamsDetails, details.ryaStreamsDetails);
}
return false;
}
@@ -239,6 +250,9 @@
private ProspectorDetails prospectorDetails;
private JoinSelectivityDetails joinSelectivityDetails;
+ // Rya Streams Details.
+ private RyaStreamsDetails ryaStreamsDetails;
+
/**
* Construcst an empty instance of {@link Builder}.
*/
@@ -262,6 +276,7 @@
freeTextDetails = details.freeTextDetails;
prospectorDetails = details.prospectorDetails;
joinSelectivityDetails = details.joinSelectivityDetails;
+ ryaStreamsDetails = details.ryaStreamsDetails.orNull();
}
/**
@@ -375,6 +390,15 @@
}
/**
+ * @param ryaStreamsDetails - Information about the instance's Rya Streams integration.
+ * @return This {@link Builder} so that method invocations may be chained.
+ */
+ public Builder setRyaStreamsDetails(@Nullable final RyaStreamsDetails ryaStreamsDetails) {
+ this.ryaStreamsDetails = ryaStreamsDetails;
+ return this;
+ }
+
+ /**
* @return An instance of {@link RyaDetails} built using this
* builder's values.
*/
@@ -389,7 +413,8 @@
temporalDetails,
freeTextDetails,
prospectorDetails,
- joinSelectivityDetails);
+ joinSelectivityDetails,
+ Optional.fromNullable(ryaStreamsDetails));
}
}
@@ -1071,4 +1096,57 @@
return false;
}
}
+
+ /**
+ * Details about the Rya instance's Rya Streams integration.
+ */
+ public static class RyaStreamsDetails implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final String hostname;
+ private final int port;
+
+ /**
+ * Constructs an instance of {@link RyaStreamsDetails}.
+ *
+ * @param hostname - The hostname used to communicate with the Rya Streams subsystem. (not null)
+ * @param port - The port used to communicate with the Rya Streams subsystem.
+ */
+ public RyaStreamsDetails(final String hostname, final int port) {
+ this.hostname = requireNonNull(hostname);
+ this.port = port;
+ }
+
+ /**
+ * @return The hostname used to communicate with the Rya Streams subsystem.
+ */
+ public String getHostname() {
+ return hostname;
+ }
+
+ /**
+ * @return The port used to communicate with the Rya Streams subsystem.
+ */
+ public int getPort() {
+ return port;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(hostname, port);
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if(this == obj) {
+ return true;
+ }
+ if(obj instanceof RyaStreamsDetails) {
+ final RyaStreamsDetails other = (RyaStreamsDetails) obj;
+ return Objects.equals(hostname, other.hostname) &&
+ port == other.port;
+ }
+ return false;
+ }
+ }
}
\ No newline at end of file
diff --git a/common/rya.api/src/test/java/org/apache/rya/api/instance/RyaDetailsTest.java b/common/rya.api/src/test/java/org/apache/rya/api/instance/RyaDetailsTest.java
index a356877..b6e92e0 100644
--- a/common/rya.api/src/test/java/org/apache/rya/api/instance/RyaDetailsTest.java
+++ b/common/rya.api/src/test/java/org/apache/rya/api/instance/RyaDetailsTest.java
@@ -31,6 +31,7 @@
import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails;
import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails.PCJUpdateStrategy;
import org.apache.rya.api.instance.RyaDetails.ProspectorDetails;
+import org.apache.rya.api.instance.RyaDetails.RyaStreamsDetails;
import org.apache.rya.api.instance.RyaDetails.TemporalIndexDetails;
import org.junit.Test;
@@ -65,7 +66,8 @@
.setId("pcj 2")
.setUpdateStrategy(PCJUpdateStrategy.INCREMENTAL)))
.setProspectorDetails( new ProspectorDetails(Optional.of(new Date())) )
- .setJoinSelectivityDetails( new JoinSelectivityDetails(Optional.of(new Date())) );
+ .setJoinSelectivityDetails( new JoinSelectivityDetails(Optional.of(new Date())) )
+ .setRyaStreamsDetails(new RyaStreamsDetails("localhost", 5));
final RyaDetails details1 = builder.build();
final RyaDetails details2 = builder.build();
@@ -96,7 +98,8 @@
.setId("pcj 2")
.setUpdateStrategy(PCJUpdateStrategy.INCREMENTAL)))
.setProspectorDetails( new ProspectorDetails(Optional.of(new Date())) )
- .setJoinSelectivityDetails( new JoinSelectivityDetails(Optional.of(new Date())) );
+ .setJoinSelectivityDetails( new JoinSelectivityDetails(Optional.of(new Date())) )
+ .setRyaStreamsDetails(new RyaStreamsDetails("localhost", 5));
final RyaDetails details1 = builder.build();
final RyaDetails details2 = builder.build();
@@ -127,6 +130,7 @@
.setUpdateStrategy(PCJUpdateStrategy.INCREMENTAL)))
.setProspectorDetails( new ProspectorDetails(Optional.of(new Date())) )
.setJoinSelectivityDetails( new JoinSelectivityDetails(Optional.of(new Date())) )
+ .setRyaStreamsDetails(new RyaStreamsDetails("localhost", 5))
.build();
// Create a new Builder using another RyaDetails object.
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/instance/MongoDetailsAdapter.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/instance/MongoDetailsAdapter.java
index 8010808..f86c150 100644
--- a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/instance/MongoDetailsAdapter.java
+++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/instance/MongoDetailsAdapter.java
@@ -32,10 +32,10 @@
import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails;
import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails.PCJUpdateStrategy;
import org.apache.rya.api.instance.RyaDetails.ProspectorDetails;
+import org.apache.rya.api.instance.RyaDetails.RyaStreamsDetails;
import org.apache.rya.api.instance.RyaDetails.TemporalIndexDetails;
import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
import com.mongodb.BasicDBList;
import com.mongodb.BasicDBObject;
import com.mongodb.BasicDBObjectBuilder;
@@ -48,7 +48,6 @@
* Serializes configuration details for use in Mongo.
* The {@link DBObject} will look like:
* <pre>
- * {@code
* {
* "instanceName": <string>,
* "version": <string>?,
@@ -68,6 +67,10 @@
* "freeTextDetails": <boolean>,
* "prospectorDetails": <date>,
* "joinSelectivityDetails": <date>
+ * "ryaStreamsDetails": {
+ * "hostname": <string>
+ * "port": <int>
+ * }
* }
* </pre>
*/
@@ -91,13 +94,19 @@
public static final String PROSPECTOR_DETAILS_KEY = "prospectorDetails";
public static final String JOIN_SELECTIVITY_DETAILS_KEY = "joinSelectivitiyDetails";
+ public static final String RYA_STREAMS_DETAILS_KEY = "ryaStreamsDetails";
+ public static final String RYA_STREAMS_HOSTNAME_KEY = "hostname";
+ public static final String RYA_STREAMS_PORT_KEY = "port";
+
/**
- * Serializes {@link RyaDetails} to mongo {@link DBObject}.
- * @param details - The details to be serialized.
- * @return The mongo {@link DBObject}.
+ * Converts a {@link RyaDetails} object into its MongoDB {@link DBObject} equivalent.
+ *
+ * @param details - The details to convert. (not null)
+ * @return The MongoDB {@link DBObject} equivalent.
*/
public static BasicDBObject toDBObject(final RyaDetails details) {
- Preconditions.checkNotNull(details);
+ requireNonNull(details);
+
final BasicDBObjectBuilder builder = BasicDBObjectBuilder.start()
.add(INSTANCE_KEY, details.getRyaInstanceName())
.add(VERSION_KEY, details.getRyaVersion())
@@ -106,12 +115,29 @@
.add(PCJ_DETAILS_KEY, toDBObject(details.getPCJIndexDetails()))
.add(TEMPORAL_DETAILS_KEY, details.getTemporalIndexDetails().isEnabled())
.add(FREETEXT_DETAILS_KEY, details.getFreeTextIndexDetails().isEnabled());
+
if(details.getProspectorDetails().getLastUpdated().isPresent()) {
builder.add(PROSPECTOR_DETAILS_KEY, details.getProspectorDetails().getLastUpdated().get());
}
+
if(details.getJoinSelectivityDetails().getLastUpdated().isPresent()) {
builder.add(JOIN_SELECTIVITY_DETAILS_KEY, details.getJoinSelectivityDetails().getLastUpdated().get());
}
+
+ // If the Rya Streams Details are present, then add them.
+ if(details.getRyaStreamsDetails().isPresent()) {
+ final RyaStreamsDetails ryaStreamsDetails = details.getRyaStreamsDetails().get();
+
+ // The embedded object that holds onto the fields.
+ final DBObject ryaStreamsFields = BasicDBObjectBuilder.start()
+ .add(RYA_STREAMS_HOSTNAME_KEY, ryaStreamsDetails.getHostname())
+ .add(RYA_STREAMS_PORT_KEY, ryaStreamsDetails.getPort())
+ .get();
+
+ // Add them to the main builder.
+ builder.add(RYA_STREAMS_DETAILS_KEY, ryaStreamsFields);
+ }
+
return (BasicDBObject) builder.get();
}
@@ -154,20 +180,38 @@
return builder.get();
}
+ /**
+ * Converts a MongoDB {@link DBObject} into its {@link RyaDetails} equivalent.
+ *
+ * @param mongoObj - The MongoDB object to convert. (not null)
+ * @return The equivalent {@link RyaDetails} object.
+ * @throws MalformedRyaDetailsException The MongoDB object could not be converted.
+ */
public static RyaDetails toRyaDetails(final DBObject mongoObj) throws MalformedRyaDetailsException {
+ requireNonNull(mongoObj);
final BasicDBObject basicObj = (BasicDBObject) mongoObj;
try {
- return RyaDetails.builder()
- .setRyaInstanceName(basicObj.getString(INSTANCE_KEY))
- .setRyaVersion(basicObj.getString(VERSION_KEY))
- .setEntityCentricIndexDetails(new EntityCentricIndexDetails(basicObj.getBoolean(ENTITY_DETAILS_KEY)))
- //RYA-215 .setGeoIndexDetails(new GeoIndexDetails(basicObj.getBoolean(GEO_DETAILS_KEY)))
- .setPCJIndexDetails(getPCJIndexDetails(basicObj))
- .setTemporalIndexDetails(new TemporalIndexDetails(basicObj.getBoolean(TEMPORAL_DETAILS_KEY)))
- .setFreeTextDetails(new FreeTextIndexDetails(basicObj.getBoolean(FREETEXT_DETAILS_KEY)))
- .setProspectorDetails(new ProspectorDetails(Optional.<Date>fromNullable(basicObj.getDate(PROSPECTOR_DETAILS_KEY))))
- .setJoinSelectivityDetails(new JoinSelectivityDetails(Optional.<Date>fromNullable(basicObj.getDate(JOIN_SELECTIVITY_DETAILS_KEY))))
- .build();
+ final RyaDetails.Builder builder = RyaDetails.builder()
+ .setRyaInstanceName(basicObj.getString(INSTANCE_KEY))
+ .setRyaVersion(basicObj.getString(VERSION_KEY))
+ .setEntityCentricIndexDetails(new EntityCentricIndexDetails(basicObj.getBoolean(ENTITY_DETAILS_KEY)))
+ //RYA-215 .setGeoIndexDetails(new GeoIndexDetails(basicObj.getBoolean(GEO_DETAILS_KEY)))
+ .setPCJIndexDetails(getPCJIndexDetails(basicObj))
+ .setTemporalIndexDetails(new TemporalIndexDetails(basicObj.getBoolean(TEMPORAL_DETAILS_KEY)))
+ .setFreeTextDetails(new FreeTextIndexDetails(basicObj.getBoolean(FREETEXT_DETAILS_KEY)))
+ .setProspectorDetails(new ProspectorDetails(Optional.<Date>fromNullable(basicObj.getDate(PROSPECTOR_DETAILS_KEY))))
+ .setJoinSelectivityDetails(new JoinSelectivityDetails(Optional.<Date>fromNullable(basicObj.getDate(JOIN_SELECTIVITY_DETAILS_KEY))));
+
+ // If the Rya Streams Details are present, then add them.
+ if(basicObj.containsField(RYA_STREAMS_DETAILS_KEY)) {
+ final BasicDBObject streamsObject = (BasicDBObject) basicObj.get(RYA_STREAMS_DETAILS_KEY);
+ final String hostname = streamsObject.getString(RYA_STREAMS_HOSTNAME_KEY);
+ final int port = streamsObject.getInt(RYA_STREAMS_PORT_KEY);
+ builder.setRyaStreamsDetails(new RyaStreamsDetails(hostname, port));
+ }
+
+ return builder.build();
+
} catch(final Exception e) {
throw new MalformedRyaDetailsException("Failed to make RyaDetail from Mongo Object, it is malformed.", e);
}
@@ -213,14 +257,15 @@
}
/**
- * Exception thrown when a MongoDB {@link DBObject} is malformed when attemptin
- * to adapt it into a {@link RyaDetails}.
+ * Indicates a MongoDB {@link DBObject} was malformed when attempting
+ * to convert it into a {@link RyaDetails} object.
*/
public static class MalformedRyaDetailsException extends Exception {
private static final long serialVersionUID = 1L;
/**
- * Creates a new {@link MalformedRyaDetailsException}
+ * Creates a new {@link MalformedRyaDetailsException}.
+ *
* @param message - The message to be displayed by the exception.
* @param e - The source cause of the exception.
*/
@@ -228,4 +273,4 @@
super(message, e);
}
}
-}
+}
\ No newline at end of file
diff --git a/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/instance/MongoDetailsAdapterTest.java b/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/instance/MongoDetailsAdapterTest.java
index 0ea9456..f5845c2 100644
--- a/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/instance/MongoDetailsAdapterTest.java
+++ b/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/instance/MongoDetailsAdapterTest.java
@@ -32,6 +32,7 @@
import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails;
import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails.PCJUpdateStrategy;
import org.apache.rya.api.instance.RyaDetails.ProspectorDetails;
+import org.apache.rya.api.instance.RyaDetails.RyaStreamsDetails;
import org.apache.rya.api.instance.RyaDetails.TemporalIndexDetails;
import org.apache.rya.mongodb.instance.MongoDetailsAdapter.MalformedRyaDetailsException;
import org.junit.Test;
@@ -72,6 +73,7 @@
.setFreeTextDetails(new FreeTextIndexDetails(true))
.setProspectorDetails(new ProspectorDetails(Optional.fromNullable(new Date(0L))))
.setJoinSelectivityDetails(new JoinSelectivityDetails(Optional.fromNullable(new Date(1L))))
+ .setRyaStreamsDetails(new RyaStreamsDetails("localhost", 6))
.build();
final BasicDBObject actual = MongoDetailsAdapter.toDBObject(details);
@@ -100,7 +102,8 @@
+ "temporalDetails : true,"
+ "freeTextDetails : true,"
+ "prospectorDetails : { $date : \"1970-01-01T00:00:00.000Z\"},"
- + "joinSelectivitiyDetails : { $date : \"1970-01-01T00:00:00.001Z\"}"
+ + "joinSelectivitiyDetails : { $date : \"1970-01-01T00:00:00.001Z\"},"
+ + "ryaStreamsDetails : { hostname : \"localhost\" , port : 6}"
+ "}"
);
@@ -134,7 +137,8 @@
+ "temporalDetails : true,"
+ "freeTextDetails : true,"
+ "prospectorDetails : { $date : \"1970-01-01T00:00:00.000Z\"},"
- + "joinSelectivitiyDetails : { $date : \"1970-01-01T00:00:00.001Z\"}"
+ + "joinSelectivitiyDetails : { $date : \"1970-01-01T00:00:00.001Z\"},"
+ + "ryaStreamsDetails : { hostname : \"localhost\" , port : 6}"
+ "}"
);
@@ -163,6 +167,7 @@
.setFreeTextDetails(new FreeTextIndexDetails(true))
.setProspectorDetails(new ProspectorDetails(Optional.<Date>fromNullable(new Date(0L))))
.setJoinSelectivityDetails(new JoinSelectivityDetails(Optional.<Date>fromNullable(new Date(1L))))
+ .setRyaStreamsDetails(new RyaStreamsDetails("localhost", 6))
.build();
assertEquals(expected, actual);
diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/SetRyaStreamsConfigurationBase.java b/extras/indexing/src/main/java/org/apache/rya/api/client/SetRyaStreamsConfigurationBase.java
new file mode 100644
index 0000000..92a4c44
--- /dev/null
+++ b/extras/indexing/src/main/java/org/apache/rya/api/client/SetRyaStreamsConfigurationBase.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.client;
+
+import static java.util.Objects.requireNonNull;
+
+import org.apache.rya.api.instance.RyaDetails;
+import org.apache.rya.api.instance.RyaDetails.RyaStreamsDetails;
+import org.apache.rya.api.instance.RyaDetailsRepository;
+import org.apache.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException;
+import org.apache.rya.api.instance.RyaDetailsUpdater;
+import org.apache.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator.CouldNotApplyMutationException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A base class that implements the core functionality of the {@link SetRyaStreamsConfiguration} interactor.
+ * Subclasses just need to implement {@link #getRyaDetailsRepo(String)} so that the common code may update
+ * any implementation of that repository.
+ */
+@DefaultAnnotation(NonNull.class)
+public abstract class SetRyaStreamsConfigurationBase implements SetRyaStreamsConfiguration {
+
+ private final InstanceExists instanceExists;
+
+ /**
+ * Constructs an instance of {@link SetRyaStreamsConfigurationBase}.
+ *
+ * @param instanceExists - The interactor used to verify Rya instances exist. (not null)
+ */
+ public SetRyaStreamsConfigurationBase(final InstanceExists instanceExists) {
+ this.instanceExists = requireNonNull(instanceExists);
+ }
+
+ /**
+ * Get a {@link RyaDetailsRepository} that is connected to a specific instance of Rya.
+ *
+ * @param ryaInstance - The Rya instance the repository must be connected to. (not null)
+ * @return A {@link RyaDetailsRepository} connected to the specified Rya instance.
+ */
+ protected abstract RyaDetailsRepository getRyaDetailsRepo(String ryaInstance);
+
+ @Override
+ public void setRyaStreamsConfiguration(final String ryaInstance, final RyaStreamsDetails streamsDetails) throws InstanceDoesNotExistException, RyaClientException{
+ requireNonNull(ryaInstance);
+ requireNonNull(streamsDetails);
+
+ // Verify the Rya Instance exists.
+ if(!instanceExists.exists(ryaInstance)) {
+ throw new InstanceDoesNotExistException("There is no Rya instance named '" + ryaInstance + "' in this storage.");
+ }
+
+ // Update the old details object using the provided Rya Streams details.
+ final RyaDetailsRepository repo = getRyaDetailsRepo(ryaInstance);
+ try {
+ new RyaDetailsUpdater(repo).update(oldDetails -> {
+ final RyaDetails.Builder builder = RyaDetails.builder(oldDetails);
+ builder.setRyaStreamsDetails(streamsDetails);
+ return builder.build();
+ });
+ } catch (CouldNotApplyMutationException | RyaDetailsRepositoryException e) {
+ throw new RyaClientException("Unable to update which Rya Streams subsystem is used by the '" +
+ ryaInstance + "' Rya instance.", e);
+ }
+ }
+}
\ No newline at end of file
diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloRyaClientFactory.java b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloRyaClientFactory.java
index 17fddaa..fdabea9 100644
--- a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloRyaClientFactory.java
+++ b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloRyaClientFactory.java
@@ -23,6 +23,7 @@
import java.util.Optional;
import org.apache.accumulo.core.client.Connector;
+import org.apache.rya.api.client.InstanceExists;
import org.apache.rya.api.client.RyaClient;
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
@@ -50,6 +51,8 @@
requireNonNull(connector);
// Build the RyaCommands option with the initialized commands.
+ final InstanceExists instanceExists = new AccumuloInstanceExists(connectionDetails, connector);
+
return new RyaClient(
new AccumuloInstall(connectionDetails, connector),
new AccumuloCreatePCJ(connectionDetails, connector),
@@ -59,10 +62,11 @@
Optional.of(new AccumuloListIncrementalQueries(connectionDetails, connector)),
new AccumuloBatchUpdatePCJ(connectionDetails, connector),
new AccumuloGetInstanceDetails(connectionDetails, connector),
- new AccumuloInstanceExists(connectionDetails, connector),
+ instanceExists,
new AccumuloListInstances(connectionDetails, connector),
Optional.of(new AccumuloAddUser(connectionDetails, connector)),
Optional.of(new AccumuloRemoveUser(connectionDetails, connector)),
+ new AccumuloSetRyaStreamsConfiguration(instanceExists, connector),
new AccumuloUninstall(connectionDetails, connector),
new AccumuloLoadStatements(connectionDetails, connector),
new AccumuloLoadStatementsFile(connectionDetails, connector),
diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloSetRyaStreamsConfiguration.java b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloSetRyaStreamsConfiguration.java
new file mode 100644
index 0000000..f19d9fb
--- /dev/null
+++ b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloSetRyaStreamsConfiguration.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.client.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.rya.accumulo.instance.AccumuloRyaInstanceDetailsRepository;
+import org.apache.rya.api.client.InstanceExists;
+import org.apache.rya.api.client.SetRyaStreamsConfiguration;
+import org.apache.rya.api.client.SetRyaStreamsConfigurationBase;
+import org.apache.rya.api.instance.RyaDetailsRepository;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * An Accumulo implementation of {@link SetRyaStreamsConfiguration}.
+ */
+@DefaultAnnotation(NonNull.class)
+public class AccumuloSetRyaStreamsConfiguration extends SetRyaStreamsConfigurationBase {
+
+ private final Connector connector;
+
+ /**
+ * Constructs an instance of {@link AccumuloSetRyaStreamsConfiguration}.
+ *
+ * @param instanceExists - The interactor used to verify Rya instances exist. (not null)
+ * @param connector - Provides programmatic access to the instance of Accumulo that hosts Rya instance. (not null)
+ */
+ public AccumuloSetRyaStreamsConfiguration(
+ final InstanceExists instanceExists,
+ final Connector connector) {
+ super(instanceExists);
+ this.connector = requireNonNull(connector);
+ }
+
+ @Override
+ protected RyaDetailsRepository getRyaDetailsRepo(final String ryaInstance) {
+ requireNonNull(ryaInstance);
+ return new AccumuloRyaInstanceDetailsRepository(connector, ryaInstance);
+ }
+}
\ No newline at end of file
diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/mongo/MongoRyaClientFactory.java b/extras/indexing/src/main/java/org/apache/rya/api/client/mongo/MongoRyaClientFactory.java
index fbbec2a..5fa4877 100644
--- a/extras/indexing/src/main/java/org/apache/rya/api/client/mongo/MongoRyaClientFactory.java
+++ b/extras/indexing/src/main/java/org/apache/rya/api/client/mongo/MongoRyaClientFactory.java
@@ -66,6 +66,7 @@
new MongoListInstances(adminClient),
Optional.empty(),
Optional.empty(),
+ new MongoSetRyaStreamsConfiguration(instanceExists, adminClient),
new MongoUninstall(adminClient, instanceExists),
new MongoLoadStatements(connectionDetails, instanceExists),
new MongoLoadStatementsFile(connectionDetails, instanceExists),
diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/mongo/MongoSetRyaStreamsConfiguration.java b/extras/indexing/src/main/java/org/apache/rya/api/client/mongo/MongoSetRyaStreamsConfiguration.java
new file mode 100644
index 0000000..592e663
--- /dev/null
+++ b/extras/indexing/src/main/java/org/apache/rya/api/client/mongo/MongoSetRyaStreamsConfiguration.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.client.mongo;
+
+import static java.util.Objects.requireNonNull;
+
+import org.apache.rya.api.client.InstanceExists;
+import org.apache.rya.api.client.SetRyaStreamsConfiguration;
+import org.apache.rya.api.client.SetRyaStreamsConfigurationBase;
+import org.apache.rya.api.instance.RyaDetailsRepository;
+import org.apache.rya.mongodb.instance.MongoRyaInstanceDetailsRepository;
+
+import com.mongodb.MongoClient;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A MongoDB implementation of {@link SetRyaStreamsConfiguration}.
+ */
+@DefaultAnnotation(NonNull.class)
+public class MongoSetRyaStreamsConfiguration extends SetRyaStreamsConfigurationBase {
+
+ private final MongoClient client;
+
+ /**
+ * Constructs an instance of {@link MongoSetRyaStreamsConfiguration}.
+ *
+ * @param instanceExists - The interactor used to verify Rya instances exist. (not null)
+ * @param client - The MongoDB client used to connect to the Rya storage. (not null)
+ */
+ public MongoSetRyaStreamsConfiguration(
+ final InstanceExists instanceExists,
+ final MongoClient client) {
+ super(instanceExists);
+ this.client = requireNonNull(client);
+ }
+
+ @Override
+ protected RyaDetailsRepository getRyaDetailsRepo(final String ryaInstance) {
+ requireNonNull(ryaInstance);
+ return new MongoRyaInstanceDetailsRepository(client, ryaInstance);
+ }
+}
\ No newline at end of file
diff --git a/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloSetRyaStreamsConfigurationIT.java b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloSetRyaStreamsConfigurationIT.java
new file mode 100644
index 0000000..928a29e
--- /dev/null
+++ b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloSetRyaStreamsConfigurationIT.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.client.accumulo;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import org.apache.rya.accumulo.AccumuloITBase;
+import org.apache.rya.api.client.Install;
+import org.apache.rya.api.client.Install.InstallConfiguration;
+import org.apache.rya.api.client.InstanceDoesNotExistException;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.instance.RyaDetails.RyaStreamsDetails;
+import org.junit.Test;
+
+/**
+ * Integration tests the methods of {@link AccumuloSetRyaStreamsConfiguration}.
+ */
+public class AccumuloSetRyaStreamsConfigurationIT extends AccumuloITBase {
+
+ @Test(expected = InstanceDoesNotExistException.class)
+ public void instanceDoesNotExist() throws Exception {
+ final String ryaInstance = getRyaInstanceName();
+ final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails(
+ getUsername(),
+ getPassword().toCharArray(),
+ getInstanceName(),
+ getZookeepers());
+ final RyaClient ryaClient = AccumuloRyaClientFactory.build(connectionDetails, getConnector());
+
+ // Skip the install step to create error causing situation.
+ final RyaStreamsDetails details = new RyaStreamsDetails("localhost", 6);
+ ryaClient.getSetRyaStreamsConfiguration().setRyaStreamsConfiguration(ryaInstance, details);
+ }
+
+ @Test
+ public void updatesRyaDetails() throws Exception {
+ final String ryaInstance = getRyaInstanceName();
+ final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails(
+ getUsername(),
+ getPassword().toCharArray(),
+ getInstanceName(),
+ getZookeepers());
+ final RyaClient ryaClient = AccumuloRyaClientFactory.build(connectionDetails, getConnector());
+
+ // Install an instance of Rya.
+ final Install installRya = ryaClient.getInstall();
+ final InstallConfiguration installConf = InstallConfiguration.builder()
+ .build();
+ installRya.install(ryaInstance, installConf);
+
+ // Fetch its details and show they do not have any RyaStreamsDetails.
+ com.google.common.base.Optional<RyaStreamsDetails> streamsDetails =
+ ryaClient.getGetInstanceDetails().getDetails(ryaInstance).get().getRyaStreamsDetails();
+ assertFalse(streamsDetails.isPresent());
+
+ // Set the details.
+ final RyaStreamsDetails details = new RyaStreamsDetails("localhost", 6);
+ ryaClient.getSetRyaStreamsConfiguration().setRyaStreamsConfiguration(ryaInstance, details);
+
+ // Fetch its details again and show that they are now filled in.
+ streamsDetails = ryaClient.getGetInstanceDetails().getDetails(ryaInstance).get().getRyaStreamsDetails();
+ assertEquals(details, streamsDetails.get());
+ }
+}
\ No newline at end of file
diff --git a/extras/indexing/src/test/java/org/apache/rya/api/client/mongo/MongoSetRyaStreamsConfigurationIT.java b/extras/indexing/src/test/java/org/apache/rya/api/client/mongo/MongoSetRyaStreamsConfigurationIT.java
new file mode 100644
index 0000000..5fea578
--- /dev/null
+++ b/extras/indexing/src/test/java/org/apache/rya/api/client/mongo/MongoSetRyaStreamsConfigurationIT.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.client.mongo;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import java.util.Optional;
+
+import org.apache.rya.api.client.Install;
+import org.apache.rya.api.client.Install.InstallConfiguration;
+import org.apache.rya.api.client.InstanceDoesNotExistException;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.instance.RyaDetails.RyaStreamsDetails;
+import org.apache.rya.mongodb.MongoITBase;
+import org.junit.Test;
+
+/**
+ * Integration tests the methods of {@link MongoSetRyaStreamsConfiguration}.
+ */
+public class MongoSetRyaStreamsConfigurationIT extends MongoITBase {
+
+ @Test(expected = InstanceDoesNotExistException.class)
+ public void instanceDoesNotExist() throws Exception {
+ final RyaClient ryaClient = MongoRyaClientFactory.build(getConnectionDetails(), getMongoClient());
+
+ // Skip the install step to create error causing situation.
+ final String ryaInstance = conf.getRyaInstanceName();
+ final RyaStreamsDetails details = new RyaStreamsDetails("localhost", 6);
+ ryaClient.getSetRyaStreamsConfiguration().setRyaStreamsConfiguration(ryaInstance, details);
+ }
+
+ @Test
+ public void updatesRyaDetails() throws Exception {
+ final RyaClient ryaClient = MongoRyaClientFactory.build(getConnectionDetails(), getMongoClient());
+
+ // Install an instance of Rya.
+ final String ryaInstance = conf.getRyaInstanceName();
+ final Install installRya = ryaClient.getInstall();
+ final InstallConfiguration installConf = InstallConfiguration.builder()
+ .build();
+ installRya.install(ryaInstance, installConf);
+
+ // Fetch its details and show they do not have any RyaStreamsDetails.
+ com.google.common.base.Optional<RyaStreamsDetails> streamsDetails =
+ ryaClient.getGetInstanceDetails().getDetails(ryaInstance).get().getRyaStreamsDetails();
+ assertFalse(streamsDetails.isPresent());
+
+ // Set the details.
+ final RyaStreamsDetails details = new RyaStreamsDetails("localhost", 6);
+ ryaClient.getSetRyaStreamsConfiguration().setRyaStreamsConfiguration(ryaInstance, details);
+
+ // Fetch its details again and show that they are now filled in.
+ streamsDetails = ryaClient.getGetInstanceDetails().getDetails(ryaInstance).get().getRyaStreamsDetails();
+ assertEquals(details, streamsDetails.get());
+ }
+
+ private MongoConnectionDetails getConnectionDetails() {
+ final Optional<char[]> password = conf.getMongoPassword() != null ?
+ Optional.of(conf.getMongoPassword().toCharArray()) :
+ Optional.empty();
+
+ return new MongoConnectionDetails(
+ conf.getMongoHostname(),
+ Integer.parseInt(conf.getMongoPort()),
+ Optional.ofNullable(conf.getMongoUser()),
+ password);
+ }
+}
\ No newline at end of file
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/RyaStreamsClient.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/RyaStreamsClient.java
new file mode 100644
index 0000000..ee86e41
--- /dev/null
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/RyaStreamsClient.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.api;
+
+import static java.util.Objects.requireNonNull;
+
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.api.model.VisibilityStatement;
+import org.apache.rya.streams.api.interactor.AddQuery;
+import org.apache.rya.streams.api.interactor.DeleteQuery;
+import org.apache.rya.streams.api.interactor.GetQuery;
+import org.apache.rya.streams.api.interactor.GetQueryResultStream;
+import org.apache.rya.streams.api.interactor.ListQueries;
+import org.apache.rya.streams.api.interactor.StartQuery;
+import org.apache.rya.streams.api.interactor.StopQuery;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Provides access to a set of Rya Streams functions.Statement
+ */
+@DefaultAnnotation(NonNull.class)
+public class RyaStreamsClient implements AutoCloseable {
+
+ private final AddQuery addQuery;
+ private final GetQuery getQuery;
+ private final DeleteQuery deleteQuery;
+ private final GetQueryResultStream<VisibilityStatement> getStatementResultStream;
+ private final GetQueryResultStream<VisibilityBindingSet> getBindingSetResultStream;
+ private final ListQueries listQueries;
+ private final StartQuery startQuery;
+ private final StopQuery stopQuery;
+
+ /**
+ * Constructs an instance of {@link RyaStreamsClient}.
+ */
+ public RyaStreamsClient(
+ final AddQuery addQuery,
+ final GetQuery getQuery,
+ final DeleteQuery deleteQuery,
+ final GetQueryResultStream<VisibilityStatement> getStatementResultStream,
+ final GetQueryResultStream<VisibilityBindingSet> getBindingSetResultStream,
+ final ListQueries listQueries,
+ final StartQuery startQuery,
+ final StopQuery stopQuery) {
+ this.addQuery = requireNonNull(addQuery);
+ this.getQuery = requireNonNull(getQuery);
+ this.deleteQuery = requireNonNull(deleteQuery);
+ this.getStatementResultStream = requireNonNull(getStatementResultStream);
+ this.getBindingSetResultStream = requireNonNull(getBindingSetResultStream);
+ this.listQueries = requireNonNull(listQueries);
+ this.startQuery = requireNonNull(startQuery);
+ this.stopQuery = requireNonNull(stopQuery);
+ }
+
+ /**
+ * @return The connected {@link AddQuery} interactor.
+ */
+ public AddQuery getAddQuery() {
+ return addQuery;
+ }
+
+ /**
+ * @return The connected {@link GetQuery} interactor.
+ */
+ public GetQuery getGetQuery() {
+ return getQuery;
+ }
+
+ /**
+ * @return The connected {@link DeleteQuery} interactor.
+ */
+ public DeleteQuery getDeleteQuery() {
+ return deleteQuery;
+ }
+
+ /**
+ * @return The connected {@link GetQueryResultStream} interactor for a query that produces
+ * {@link VisibilityStatement}s.
+ */
+ public GetQueryResultStream<VisibilityStatement> getGetStatementResultStream() {
+ return getStatementResultStream;
+ }
+
+ /**
+ * @return The connected {@link GetQueryResultStream} interactor for a query that produces
+ * {@link VisibilityBindingSet}s.
+ */
+ public GetQueryResultStream<VisibilityBindingSet> getGetBindingSetResultStream() {
+ return getBindingSetResultStream;
+ }
+
+ /**
+ * @return The connected {@link ListQueries} interactor.
+ */
+ public ListQueries getListQueries() {
+ return listQueries;
+ }
+
+ /**
+ * @return The connected {@link StartQuery} interactor.
+ */
+ public StartQuery getStartQuery() {
+ return startQuery;
+ }
+
+ /**
+ * @return The connected {@link StopQuery} interactor.
+ */
+ public StopQuery getStopQuery() {
+ return stopQuery;
+ }
+
+ /**
+ * By defualt, this client doesn't close anything. If an implementation of the client
+ * requires closing components, then override this method.
+ */
+ @Override
+ public void close() throws Exception { }
+}
\ No newline at end of file
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/GetQuery.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/GetQuery.java
new file mode 100644
index 0000000..1293714
--- /dev/null
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/GetQuery.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.api.interactor;
+
+import java.util.Optional;
+import java.util.UUID;
+
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.api.exception.RyaStreamsException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Get a {@link StreamsQuery} from Rya Streams.
+ */
+@DefaultAnnotation(NonNull.class)
+public interface GetQuery {
+
+ /**
+ * Get a {@link StreamsQuery} from Rya Streams.
+ *
+ * @param queryId - Identifies the query to fetch. (not null)
+ * @return The {@link StreamsQuery} for the {@code queryId}; if one is stored for the ID.
+ * @throws RyaStreamsException The query could not be fetched.
+ */
+ public Optional<StreamsQuery> getQuery(UUID queryId) throws RyaStreamsException;
+}
\ No newline at end of file
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultGetQuery.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultGetQuery.java
new file mode 100644
index 0000000..14b93ab
--- /dev/null
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultGetQuery.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.api.interactor.defaults;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Optional;
+import java.util.UUID;
+
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.api.exception.RyaStreamsException;
+import org.apache.rya.streams.api.interactor.GetQuery;
+import org.apache.rya.streams.api.queries.QueryRepository;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Get a {@link StreamsQuery} from Rya Streams.
+ */
+@DefaultAnnotation(NonNull.class)
+public class DefaultGetQuery implements GetQuery {
+ private final QueryRepository repository;
+
+ /**
+ * Constructs an instance of {@link DefaultGetQuery}.
+ *
+ * @param repository - The {@link QueryRepository} to get queries from. (not null)
+ */
+ public DefaultGetQuery(final QueryRepository repository) {
+ this.repository = requireNonNull(repository);
+ }
+
+ @Override
+ public Optional<StreamsQuery> getQuery(final UUID queryId) throws RyaStreamsException {
+ requireNonNull(queryId);
+ return repository.get(queryId);
+ }
+}
\ No newline at end of file
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaRyaStreamsClientFactory.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaRyaStreamsClientFactory.java
new file mode 100644
index 0000000..9250d9d
--- /dev/null
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaRyaStreamsClientFactory.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Properties;
+import java.util.UUID;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.rya.streams.api.RyaStreamsClient;
+import org.apache.rya.streams.api.interactor.defaults.DefaultAddQuery;
+import org.apache.rya.streams.api.interactor.defaults.DefaultDeleteQuery;
+import org.apache.rya.streams.api.interactor.defaults.DefaultGetQuery;
+import org.apache.rya.streams.api.interactor.defaults.DefaultListQueries;
+import org.apache.rya.streams.api.interactor.defaults.DefaultStartQuery;
+import org.apache.rya.streams.api.interactor.defaults.DefaultStopQuery;
+import org.apache.rya.streams.api.queries.InMemoryQueryRepository;
+import org.apache.rya.streams.api.queries.QueryChange;
+import org.apache.rya.streams.api.queries.QueryChangeLog;
+import org.apache.rya.streams.api.queries.QueryRepository;
+import org.apache.rya.streams.kafka.interactor.KafkaGetQueryResultStream;
+import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog;
+import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
+import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer;
+import org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserializer;
+import org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Constructs instances of {@link RyaStreamsClient} that are connected to a Kafka cluster.
+ */
+@DefaultAnnotation(NonNull.class)
+public final class KafkaRyaStreamsClientFactory {
+ private static final Logger log = LoggerFactory.getLogger(KafkaRyaStreamsClientFactory.class);
+
+ /**
+ * Initialize a {@link RyaStreamsClient} that will interact with an instance of Rya Streams
+ * that is backed by Kafka.
+ *
+ * @param ryaInstance - The name of the Rya Instance the client is connected to. (not null)
+ * @param kafkaHostname - The hostname of the Kafka Broker.
+ * @param kafkaPort - The port of the Kafka Broker.
+ * @return The initialized commands.
+ */
+ public static RyaStreamsClient make(
+ final String ryaInstance,
+ final String kafkaHostname,
+ final int kafkaPort) {
+ requireNonNull(ryaInstance);
+ requireNonNull(kafkaHostname);
+
+ // Setup Query Repository used by the Kafka Rya Streams subsystem.
+ final Producer<?, QueryChange> queryProducer =
+ makeProducer(kafkaHostname, kafkaPort, StringSerializer.class, QueryChangeSerializer.class);
+ final Consumer<?, QueryChange>queryConsumer =
+ fromStartConsumer(kafkaHostname, kafkaPort, StringDeserializer.class, QueryChangeDeserializer.class);
+ final String changeLogTopic = KafkaTopics.queryChangeLogTopic(ryaInstance);
+ final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, changeLogTopic);
+ final QueryRepository queryRepo = new InMemoryQueryRepository(changeLog);
+
+ // Create the Rya Streams client that is backed by a Kafka Query Change Log.
+ return new RyaStreamsClient(
+ new DefaultAddQuery(queryRepo),
+ new DefaultGetQuery(queryRepo),
+ new DefaultDeleteQuery(queryRepo),
+ new KafkaGetQueryResultStream<>(kafkaHostname, "" + kafkaPort, VisibilityStatementDeserializer.class),
+ new KafkaGetQueryResultStream<>(kafkaHostname, "" + kafkaPort, VisibilityBindingSetDeserializer.class),
+ new DefaultListQueries(queryRepo),
+ new DefaultStartQuery(queryRepo),
+ new DefaultStopQuery(queryRepo)) {
+
+ /**
+ * Close the QueryRepository used by the returned client.
+ */
+ @Override
+ public void close() {
+ try {
+ queryRepo.close();
+ } catch (final Exception e) {
+ log.warn("Couldn't close a QueryRepository.", e);
+ }
+ }
+ };
+ }
+
+ /**
+ * Create a {@link Producer} that is able to write to a topic in Kafka.
+ *
+ * @param kafkaHostname - The Kafka broker hostname. (not null)
+ * @param kafkaPort - The Kafka broker port.
+ * @param keySerializerClass - Serializes the keys. (not null)
+ * @param valueSerializerClass - Serializes the values. (not null)
+ * @return A {@link Producer} that can be used to write records to a topic.
+ */
+ private static <K, V> Producer<K, V> makeProducer(
+ final String kafkaHostname,
+ final int kakfaPort,
+ final Class<? extends Serializer<K>> keySerializerClass,
+ final Class<? extends Serializer<V>> valueSerializerClass) {
+ requireNonNull(kafkaHostname);
+ requireNonNull(keySerializerClass);
+ requireNonNull(valueSerializerClass);
+
+ final Properties producerProps = new Properties();
+ producerProps.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaHostname + ":" + kakfaPort);
+ producerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass.getName());
+ producerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass.getName());
+ return new KafkaProducer<>(producerProps);
+ }
+
+ /**
+ * Create a {@link Consumer} that has a unique group ID and reads everything from a topic in Kafka
+ * starting at the earliest point by default.
+ *
+ * @param kafkaHostname - The Kafka broker hostname. (not null)
+ * @param kafkaPort - The Kafka broker port.
+ * @param keyDeserializerClass - Deserializes the keys. (not null)
+ * @param valueDeserializerClass - Deserializes the values. (not null)
+ * @return A {@link Consumer} that can be used to read records from a topic.
+ */
+ private static <K, V> Consumer<K, V> fromStartConsumer(
+ final String kafkaHostname,
+ final int kakfaPort,
+ final Class<? extends Deserializer<K>> keyDeserializerClass,
+ final Class<? extends Deserializer<V>> valueDeserializerClass) {
+ requireNonNull(kafkaHostname);
+ requireNonNull(keyDeserializerClass);
+ requireNonNull(valueDeserializerClass);
+
+ final Properties consumerProps = new Properties();
+ consumerProps.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaHostname + ":" + kakfaPort);
+ consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
+ consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString());
+ consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializerClass.getName());
+ consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClass.getName());
+ return new KafkaConsumer<>(consumerProps);
+ }
+}
\ No newline at end of file
diff --git a/extras/shell/pom.xml b/extras/shell/pom.xml
index 1a07400..fcca909 100644
--- a/extras/shell/pom.xml
+++ b/extras/shell/pom.xml
@@ -59,6 +59,10 @@
<artifactId>rya.pcj.fluo.api</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.streams.kafka</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.apache.fluo</groupId>
<artifactId>fluo-core</artifactId>
</dependency>
diff --git a/extras/shell/src/main/java/org/apache/rya/shell/RyaConnectionCommands.java b/extras/shell/src/main/java/org/apache/rya/shell/RyaConnectionCommands.java
index b6fddb0..b4168af 100644
--- a/extras/shell/src/main/java/org/apache/rya/shell/RyaConnectionCommands.java
+++ b/extras/shell/src/main/java/org/apache/rya/shell/RyaConnectionCommands.java
@@ -34,10 +34,15 @@
import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
import org.apache.rya.api.client.mongo.MongoConnectionDetails;
import org.apache.rya.api.client.mongo.MongoRyaClientFactory;
+import org.apache.rya.api.instance.RyaDetails;
+import org.apache.rya.api.instance.RyaDetails.RyaStreamsDetails;
import org.apache.rya.shell.SharedShellState.ConnectionState;
+import org.apache.rya.shell.SharedShellState.ShellState;
import org.apache.rya.shell.SharedShellState.StorageType;
import org.apache.rya.shell.util.ConnectorFactory;
import org.apache.rya.shell.util.PasswordPrompt;
+import org.apache.rya.streams.api.RyaStreamsClient;
+import org.apache.rya.streams.kafka.KafkaRyaStreamsClientFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.shell.core.CommandMarker;
import org.springframework.shell.core.annotation.CliAvailabilityIndicator;
@@ -222,31 +227,58 @@
@CliCommand(value = CONNECT_INSTANCE_CMD, help = "Connect to a specific Rya instance")
public void connectToInstance(
@CliOption(key = {"instance"}, mandatory = true, help = "The name of the Rya instance the shell will interact with.")
- final String instance) {
+ final String ryaInstance) {
+ final RyaClient ryaClient = sharedState.getShellState().getConnectedCommands().get();
try {
- final InstanceExists instanceExists = sharedState.getShellState().getConnectedCommands().get().getInstanceExists();
+ final InstanceExists instanceExists = ryaClient.getInstanceExists();
// Make sure the requested instance exists.
- if(!instanceExists.exists(instance)) {
- throw new RuntimeException(String.format("'%s' does not match an existing Rya instance.", instance));
+ if(!instanceExists.exists(ryaInstance)) {
+ throw new RuntimeException(String.format("'%s' does not match an existing Rya instance.", ryaInstance));
+ }
+
+ // Store the instance name in the shared state.
+ sharedState.connectedToInstance(ryaInstance);
+
+ // If the Rya instance is configured to interact with Rya Streams, then connect the
+ // Rya Streams client to the shared state.
+ final com.google.common.base.Optional<RyaDetails> ryaDetails = ryaClient.getGetInstanceDetails().getDetails(ryaInstance);
+ if(ryaDetails.isPresent()) {
+ final com.google.common.base.Optional<RyaStreamsDetails> streamsDetails = ryaDetails.get().getRyaStreamsDetails();
+ if(streamsDetails.isPresent()) {
+ final String kafkaHostname = streamsDetails.get().getHostname();
+ final int kafkaPort = streamsDetails.get().getPort();
+ final RyaStreamsClient streamsClient = KafkaRyaStreamsClientFactory.make(ryaInstance, kafkaHostname, kafkaPort);
+ sharedState.connectedToRyaStreams(streamsClient);
+ }
}
} catch(final RyaClientException e) {
throw new RuntimeException("Could not connect to Rya instance. Reason: " + e.getMessage(), e);
}
-
- // Store the instance name in the shared state.
- sharedState.connectedToInstance(instance);
}
@CliCommand(value = DISCONNECT_COMMAND_NAME_CMD, help = "Disconnect the shell's Rya storage connection (Accumulo).")
public void disconnect() {
+ final ShellState shellState = sharedState.getShellState();
+
// If connected to Mongo, there is a client that needs to be closed.
- final com.google.common.base.Optional<MongoClient> mongoAdminClient = sharedState.getShellState().getMongoAdminClient();
+ final com.google.common.base.Optional<MongoClient> mongoAdminClient = shellState.getMongoAdminClient();
if(mongoAdminClient.isPresent()) {
mongoAdminClient.get().close();
}
+ // If connected to Rya Streams, then close the associated resources.
+ final com.google.common.base.Optional<RyaStreamsClient> streamsClient = shellState.getRyaStreamsCommands();
+ if(streamsClient.isPresent()) {
+ try {
+ streamsClient.get().close();
+ } catch (final Exception e) {
+ System.err.print("Could not close the RyaStreamsClient.");
+ e.printStackTrace();
+ }
+ }
+
// Update the shared state to disconnected.
sharedState.disconnected();
}
-}
+}
\ No newline at end of file
diff --git a/extras/shell/src/main/java/org/apache/rya/shell/RyaStreamsCommands.java b/extras/shell/src/main/java/org/apache/rya/shell/RyaStreamsCommands.java
new file mode 100644
index 0000000..5f7df84
--- /dev/null
+++ b/extras/shell/src/main/java/org/apache/rya/shell/RyaStreamsCommands.java
@@ -0,0 +1,297 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.shell;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.instance.RyaDetails;
+import org.apache.rya.api.instance.RyaDetails.RyaStreamsDetails;
+import org.apache.rya.shell.SharedShellState.ConnectionState;
+import org.apache.rya.shell.util.SparqlPrompt;
+import org.apache.rya.shell.util.StreamsQueryFormatter;
+import org.apache.rya.streams.api.RyaStreamsClient;
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.api.exception.RyaStreamsException;
+import org.apache.rya.streams.kafka.KafkaRyaStreamsClientFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.shell.core.CommandMarker;
+import org.springframework.shell.core.annotation.CliAvailabilityIndicator;
+import org.springframework.shell.core.annotation.CliCommand;
+import org.springframework.shell.core.annotation.CliOption;
+import org.springframework.stereotype.Component;
+
+import com.google.common.base.Optional;
+
+/**
+ * Rya Shell commands used to interact with the Rya Streams subsystem.
+ */
+@Component
+public class RyaStreamsCommands implements CommandMarker {
+
+ public static final String STREAMS_CONFIGURE_CMD = "streams-configure";
+ public static final String STREAMS_DETAILS_CMD = "streams-details";
+ public static final String STREAM_QUERIES_ADD_CMD = "streams-queries-add";
+ public static final String STREAM_QUERIES_DELETE_CMD = "streams-queries-delete";
+ public static final String STREAM_QUERIES_START_CMD = "streams-queries-start";
+ public static final String STREAM_QUERIES_STOP_CMD = "streams-queries-stop";
+ public static final String STREAM_QUERIES_LIST_CMD = "streams-queries-list";
+ public static final String STREAM_QUERIES_DETAILS_CMD = "streams-queries-details";
+
+ private final SharedShellState state;
+ private final SparqlPrompt sparqlPrompt;
+
+ /**
+ * Constructs an instance of {@link RyaStreamsCommands}.
+ *
+ * @param state - Holds shared state between all of the command classes. (not null)
+ * @param sparqlPrompt - Prompts a user for a SPARQL query. (not null)
+ */
+ @Autowired
+ public RyaStreamsCommands(
+ final SharedShellState state,
+ final SparqlPrompt sparqlPrompt) {
+ this.state = requireNonNull(state);
+ this.sparqlPrompt = requireNonNull(sparqlPrompt);
+ }
+
+ /**
+ * Enables commands that become available when connected to a Rya instance.
+ */
+ @CliAvailabilityIndicator({
+ STREAMS_CONFIGURE_CMD,
+ STREAMS_DETAILS_CMD})
+ public boolean areConfigCommandsAvailable() {
+ return state.getShellState().getConnectionState() == ConnectionState.CONNECTED_TO_INSTANCE;
+ }
+
+ /**
+ * Enables commands that become available when a Rya instance has a configured Rya Streams subsystem to use.
+ */
+ @CliAvailabilityIndicator({
+ STREAM_QUERIES_ADD_CMD,
+ STREAM_QUERIES_DELETE_CMD,
+ STREAM_QUERIES_START_CMD,
+ STREAM_QUERIES_STOP_CMD,
+ STREAM_QUERIES_LIST_CMD,
+ STREAM_QUERIES_DETAILS_CMD})
+ public boolean areQueriesCommandsAvailable() {
+ return state.getShellState().getRyaStreamsCommands().isPresent();
+ }
+
+ @CliCommand(value = STREAMS_CONFIGURE_CMD, help = "Connect a Rya Streams subsystem to a Rya Instance.")
+ public String configureRyaStreams(
+ @CliOption(key = {"kafkaHostname"}, mandatory = true, help = "The hostname of the Kafka Broker.")
+ final String kafkaHostname,
+ @CliOption(key = {"kafkaPort"}, mandatory = true, help = "The port of the Kafka Broker.")
+ final int kafkaPort) {
+
+ // If this instance was connected to a different Rya Streams subsystem, then close that client.
+ final Optional<RyaStreamsClient> oldClient = state.getShellState().getRyaStreamsCommands();
+ if(oldClient.isPresent()) {
+ try {
+ oldClient.get().close();
+ } catch (final Exception e) {
+ System.err.print("Warning: Could not close the old Rya Streams Client.");
+ e.printStackTrace();
+ }
+ }
+
+ // Update the Rya Details for the connected Rya Instance.
+ final String ryaInstance = state.getShellState().getRyaInstanceName().get();
+ final RyaClient ryaClient = state.getShellState().getConnectedCommands().get();
+ try {
+ final RyaStreamsDetails streamsDetails = new RyaStreamsDetails(kafkaHostname, kafkaPort);
+ ryaClient.getSetRyaStreamsConfiguration().setRyaStreamsConfiguration(ryaInstance, streamsDetails);
+ } catch (final RyaClientException e) {
+ throw new RuntimeException("Could not update the Rya instance's Rya Details to include the new " +
+ "information. This command failed to complete.", e);
+ }
+
+ // Connect a Rya Streams Client and set it in the shared state.
+ final RyaStreamsClient newClient = KafkaRyaStreamsClientFactory.make(ryaInstance, kafkaHostname, kafkaPort);
+ state.connectedToRyaStreams(newClient);
+
+ // Return a message that indicates the operation was successful.
+ if(oldClient.isPresent()) {
+ return "The Rya Streams subsystem that this Rya instance uses has been changed. Any queries that were " +
+ "maintained by the previous subsystem will need to be migrated to the new one.";
+ } else {
+ return "The Rya Instance has been updated to use the provided Rya Streams subsystem. " +
+ "Rya Streams commands are now avaiable while connected to this instance.";
+ }
+ }
+
+ @CliCommand(value = STREAMS_DETAILS_CMD, help = "Print information about which Rya Streams subsystem the Rya instance is connected to.")
+ public String printRyaStreamsDetails() {
+ final String ryaInstance = state.getShellState().getRyaInstanceName().get();
+ final RyaClient client = state.getShellState().getConnectedCommands().get();
+ try {
+ // Handle the case where the instance does not have Rya Details.
+ final Optional<RyaDetails> details = client.getGetInstanceDetails().getDetails(ryaInstance);
+ if(!details.isPresent()) {
+ return "This instance does not have any Rya Details, so it is unable to be connected to the Rya Streams subsystem.";
+ }
+
+ // Print a message based on if the instance is connected to Rya Streams.
+ final Optional<RyaStreamsDetails> streamsDetails = details.get().getRyaStreamsDetails();
+ if(!streamsDetails.isPresent()) {
+ return "This instance of Rya has not been configured to use a Rya Streams subsystem.";
+ }
+
+ // Print the details about which Rya Streams subsystem is being used.
+ return "Kafka Hostname: " + streamsDetails.get().getHostname() + ", Kafka Port: " + streamsDetails.get().getPort();
+
+ } catch (final RyaClientException e) {
+ throw new RuntimeException("Could not fetch the Rya Details for this Rya instance.", e);
+ }
+ }
+
+ @CliCommand(value = STREAM_QUERIES_ADD_CMD, help = "Add a SPARQL query to the Rya Streams subsystem.")
+ public String addQuery(
+ @CliOption(key = {"inactive"}, mandatory = false, unspecifiedDefaultValue = "false", specifiedDefaultValue = "true",
+ help = "Setting this flag will add the query, but not run it. (default: false)")
+ final boolean inactive) {
+ final RyaStreamsClient streamsClient = state.getShellState().getRyaStreamsCommands().get();
+
+ // Prompt the user for the SPARQL that defines the query.
+ try {
+ final Optional<String> sparql = sparqlPrompt.getSparql();
+
+ // If the user aborted the prompt, return.
+ if(!sparql.isPresent()) {
+ return "";
+ }
+
+ final StreamsQuery streamsQuery = streamsClient.getAddQuery().addQuery(sparql.get(), !inactive);
+ return "The added query's ID is " + streamsQuery.getQueryId();
+
+ } catch (final IOException | RyaStreamsException e) {
+ throw new RuntimeException("Unable to add the SPARQL query to the Rya Streams subsystem.", e);
+ }
+ }
+
+ @CliCommand(value = STREAM_QUERIES_DELETE_CMD, help = "Delete a SPARQL query from the Rya Streams subsystem.")
+ public String deleteQuery(
+ @CliOption(key= {"queryId"}, mandatory = true, help = "The ID of the query to remove.")
+ final String queryId) {
+
+ final RyaStreamsClient streamsClient = state.getShellState().getRyaStreamsCommands().get();
+ final UUID id = UUID.fromString(queryId);
+ try {
+ streamsClient.getDeleteQuery().delete(id);
+ } catch (final RyaStreamsException e) {
+ throw new RuntimeException("Could not delete the query from the Rya Streams subsystem.", e);
+ }
+ return "The query has been deleted.";
+ }
+
+ @CliCommand(value = STREAM_QUERIES_START_CMD, help = "Start processing a SPARQL query using the Rya Streams subsystem.")
+ public String startQuery(
+ @CliOption(key= {"queryId"}, mandatory = true, help = "The ID of the query to start processing.")
+ final String queryId) {
+ final RyaStreamsClient streamsClient = state.getShellState().getRyaStreamsCommands().get();
+
+ try {
+ // Ensure the query exists.
+ final UUID id = UUID.fromString(queryId);
+ final java.util.Optional<StreamsQuery> streamsQuery = streamsClient.getGetQuery().getQuery(id);
+ if(!streamsQuery.isPresent()) {
+ throw new RuntimeException("No Rya Streams query exists for ID " + queryId);
+ }
+
+ // Ensure it isn't already started.
+ if(streamsQuery.get().isActive()) {
+ return "That query is already running.";
+ }
+
+ // Start it.
+ streamsClient.getStartQuery().start(id);
+ return "The query will be processed by the Rya Streams subsystem.";
+
+ } catch (final RyaStreamsException e) {
+ throw new RuntimeException("Unable to start the Query.", e);
+ }
+ }
+
+ @CliCommand(value = STREAM_QUERIES_STOP_CMD, help = "Stop processing a SPARQL query using the Rya Streams subsystem.")
+ public String stopQuery(
+ @CliOption(key= {"queryId"}, mandatory = true, help = "The ID of the query to stop processing.")
+ final String queryId) {
+ final RyaStreamsClient streamsClient = state.getShellState().getRyaStreamsCommands().get();
+
+ try {
+ // Ensure the query exists.
+ final UUID id = UUID.fromString(queryId);
+ final java.util.Optional<StreamsQuery> streamsQuery = streamsClient.getGetQuery().getQuery(id);
+ if(!streamsQuery.isPresent()) {
+ throw new RuntimeException("No Rya Streams query exists for ID " + queryId);
+ }
+
+ // Ensure it isn't already stopped.
+ if(!streamsQuery.get().isActive()) {
+ return "That query is already stopped.";
+ }
+
+ // Stop it.
+ streamsClient.getStopQuery().stop(id);
+ return "The query will no longer be processed by the Rya Streams subsystem.";
+
+ } catch (final RyaStreamsException e) {
+ throw new RuntimeException("Unable to start the Query.", e);
+ }
+ }
+
+ @CliCommand(value = STREAM_QUERIES_LIST_CMD, help = "List the queries that are being managed by the configured Rya Streams subsystem.")
+ public String listQueries() {
+ final RyaStreamsClient streamsClient = state.getShellState().getRyaStreamsCommands().get();
+ try {
+ final Set<StreamsQuery> queries = streamsClient.getListQueries().all();
+ return StreamsQueryFormatter.format(queries);
+ } catch (final RyaStreamsException e) {
+ throw new RuntimeException("Unable to fetch the queries from the Rya Streams subsystem.", e);
+ } catch (final Exception e) {
+ throw new RuntimeException("Unable to print the query to the console.", e);
+ }
+ }
+
+ @CliCommand(value = STREAM_QUERIES_DETAILS_CMD, help = "Print detailed information about a specific query managed by the Rya Streams subsystem.")
+ public String printQueryDetails(
+ @CliOption(key= {"queryId"}, mandatory = true, help = "The ID of the query whose details will be printed.")
+ final String queryId) {
+ final RyaStreamsClient streamsClient = state.getShellState().getRyaStreamsCommands().get();
+ final UUID id = UUID.fromString(queryId);
+ try {
+ final java.util.Optional<StreamsQuery> query = streamsClient.getGetQuery().getQuery(id);
+ if(!query.isPresent()) {
+ return "There is no query with the specified ID.";
+ }
+ return StreamsQueryFormatter.format(query.get());
+ } catch (final RyaStreamsException e) {
+ throw new RuntimeException("Unable to fetch the query from the Rya Streams subsystem.", e);
+ } catch (final Exception e) {
+ throw new RuntimeException("Unable to print the query to the console.", e);
+ }
+ }
+}
\ No newline at end of file
diff --git a/extras/shell/src/main/java/org/apache/rya/shell/SharedShellState.java b/extras/shell/src/main/java/org/apache/rya/shell/SharedShellState.java
index bb22fd3..58bcfbe 100644
--- a/extras/shell/src/main/java/org/apache/rya/shell/SharedShellState.java
+++ b/extras/shell/src/main/java/org/apache/rya/shell/SharedShellState.java
@@ -26,6 +26,8 @@
import org.apache.rya.api.client.RyaClient;
import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
import org.apache.rya.api.client.mongo.MongoConnectionDetails;
+import org.apache.rya.api.instance.RyaDetails;
+import org.apache.rya.streams.api.RyaStreamsClient;
import com.google.common.base.Optional;
import com.mongodb.MongoClient;
@@ -159,6 +161,34 @@
}
/**
+ * This method indicates a shift into a state where the shell may have to interact with the Rya Streams subsystem.
+ * <p/>
+ * Stores the {@link RyaStreamsClient} all Rya Streams commands will be executed against.
+ *
+ * @param ryaStreamsCommands - Rya Streams commands that will execute against the Rya Streams subsystem. (not null)
+ */
+ public void connectedToRyaStreams(
+ final RyaStreamsClient ryaStreamsCommands) {
+ requireNonNull(ryaStreamsCommands);
+
+ lock.lock();
+ try {
+ // Verify the Rya Shell is connected to an instance.
+ if(shellState.getConnectionState() != ConnectionState.CONNECTED_TO_INSTANCE) {
+ throw new IllegalStateException("You can not set the connected Rya Streams Client before connected to a Rya Instance.");
+ }
+
+ // Set the connected Rya Streams commands.
+ shellState = ShellState.builder( shellState )
+ .setRyaStreamsCommands(ryaStreamsCommands)
+ .build();
+
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
* This method indicates a shift into the {@link DISCONNECTED} state.
* <p/>
* Clears all of the values associated with a Rya Storage/Instance connection.
@@ -225,6 +255,7 @@
private final Optional<MongoConnectionDetails> mongoDetails;
private final Optional<MongoClient> mongoAdminClient;
private final Optional<RyaClient> connectedCommands;
+ private final Optional<RyaStreamsClient> ryaStreamsCommands;
// Instance specific values.
private final Optional<String> instanceName;
@@ -236,7 +267,8 @@
final Optional<MongoConnectionDetails> mongoDetails,
final Optional<MongoClient> mongoAdminClient,
final Optional<RyaClient> connectedCommands,
- final Optional<String> instanceName) {
+ final Optional<String> instanceName,
+ final Optional<RyaStreamsClient> ryaStreamsCommands) {
this.connectionState = requireNonNull(connectionState);
this.storageType = requireNonNull(storageType);
this.accumuloDetails = requireNonNull(accumuloDetails);
@@ -244,6 +276,7 @@
this.mongoAdminClient = requireNonNull(mongoAdminClient);
this.connectedCommands = requireNonNull(connectedCommands);
this.instanceName = requireNonNull(instanceName);
+ this.ryaStreamsCommands = requireNonNull(ryaStreamsCommands);
}
/**
@@ -294,6 +327,15 @@
}
/**
+ * @return The {@link RyaStreamsClient} to use when a command on the shell is issued.
+ * The value will not be present if the Rya Shell is not connected to an instance
+ * whose {@link RyaDetails} indicate when Rya Streams system to use.
+ */
+ public Optional<RyaStreamsClient> getRyaStreamsCommands() {
+ return ryaStreamsCommands;
+ }
+
+ /**
* @return The name of the Rya Instance the Rya Shell is issuing commands to.
* The value will not be present if the Rya Shell is not connected to a
* storage or if a target instance has not been set yet.
@@ -353,6 +395,7 @@
private MongoConnectionDetails mongoDetails;
private MongoClient mongoAdminClient;
private RyaClient connectedCommands;
+ private RyaStreamsClient ryaStreamsCommands;
// Instance specific values.
private String instanceName;
@@ -375,6 +418,7 @@
this.mongoDetails = shellState.getMongoDetails().orNull();
this.mongoAdminClient = shellState.getMongoAdminClient().orNull();
this.connectedCommands = shellState.getConnectedCommands().orNull();
+ this.ryaStreamsCommands = shellState.getRyaStreamsCommands().orNull();
this.instanceName = shellState.getRyaInstanceName().orNull();
}
@@ -435,6 +479,15 @@
}
/**
+ * @param ryaStreamsCommands - The {@link RyaStreamsClient} to use when a command on the shell is issued.
+ * @return This {@link Builder} so that method invocations may be chained.
+ */
+ public Builder setRyaStreamsCommands(@Nullable final RyaStreamsClient ryaStreamsCommands) {
+ this.ryaStreamsCommands = ryaStreamsCommands;
+ return this;
+ }
+
+ /**
* @param instanceName - The name of the Rya Instance the Rya Shell is issuing commands to.
* @return This {@link Builder} so that method invocations may be chained.
*/
@@ -454,7 +507,8 @@
Optional.fromNullable(mongoDetails),
Optional.fromNullable(mongoAdminClient),
Optional.fromNullable(connectedCommands),
- Optional.fromNullable(instanceName));
+ Optional.fromNullable(instanceName),
+ Optional.fromNullable(ryaStreamsCommands));
}
}
}
diff --git a/extras/shell/src/main/java/org/apache/rya/shell/util/StreamsQueryFormatter.java b/extras/shell/src/main/java/org/apache/rya/shell/util/StreamsQueryFormatter.java
new file mode 100644
index 0000000..6c06caf
--- /dev/null
+++ b/extras/shell/src/main/java/org/apache/rya/shell/util/StreamsQueryFormatter.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.shell.util;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.openrdf.query.parser.ParsedQuery;
+import org.openrdf.query.parser.sparql.SPARQLParser;
+import org.openrdf.queryrender.sparql.SPARQLQueryRenderer;
+
+import com.google.common.collect.Lists;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Pretty formats {@link StreamsQuery} objects.
+ */
+@DefaultAnnotation(NonNull.class)
+public final class StreamsQueryFormatter {
+
+ /**
+ * Pretty formats a {@link StreamsQuery}.
+ *
+ * @param query - The query to format. (not null)
+ * @return The pretty formatted string.
+ * @throws Exception A problem was encountered while pretty formatting the SPARQL.
+ */
+ public static String format(final StreamsQuery query) throws Exception {
+ requireNonNull(query);
+
+ // Pretty format the SPARQL query.
+ final ParsedQuery parsedQuery = new SPARQLParser().parseQuery(query.getSparql(), null);
+ final String prettySparql = new SPARQLQueryRenderer().render(parsedQuery);
+ final String[] lines = prettySparql.split("\n");
+
+ // Create the formatted string.
+ query.getQueryId();
+ query.isActive();
+
+ String.format(" QueryId: %s", query.getQueryId());
+
+ final StringBuilder builder = new StringBuilder();
+ builder.append(" Query ID: ").append( query.getQueryId() ) .append("\n");
+ builder.append("Is Active: ").append( query.isActive() ).append("\n");
+ builder.append(" SPARQL: ").append( lines[0] ).append("\n");
+
+ for(int i = 1; i < lines.length; i++) {
+ builder.append(" ").append(lines[i]).append("\n");
+ }
+ return builder.toString();
+ }
+
+ /**
+ * Pretty formats a collection {@link StreamsQuery}s.
+ * They will be sorted based on their Query IDs.
+ *
+ * @param queries - The queries to format. (not null)
+ * @return The pretty formatted string.
+ * @throws Exception A problem was encountered while pretty formatting the SPARQL.
+ */
+ public static String format(final Collection<StreamsQuery> queries) throws Exception {
+ requireNonNull(queries);
+
+ if(queries.size() == 1) {
+ return format(queries.iterator().next());
+ }
+
+ // Sort the queries based on their IDs.
+ final List<StreamsQuery> sorted = Lists.newArrayList(queries);
+ sorted.sort((query1, query2) -> {
+ final String id1 = query1.getQueryId().toString();
+ final String id2 = query2.getQueryId().toString();
+ return id1.compareTo(id2);
+ });
+
+ // Format a list of the queries.
+ final StringBuilder builder = new StringBuilder();
+ builder.append("-----------------------------------------------\n");
+ for(final StreamsQuery query : sorted) {
+ builder.append( format(query) );
+ builder.append("-----------------------------------------------\n");
+ }
+ return builder.toString();
+ }
+}
\ No newline at end of file
diff --git a/extras/shell/src/main/resources/META-INF/spring/spring-shell-plugin.xml b/extras/shell/src/main/resources/META-INF/spring/spring-shell-plugin.xml
index 361bf27..2473af1 100644
--- a/extras/shell/src/main/resources/META-INF/spring/spring-shell-plugin.xml
+++ b/extras/shell/src/main/resources/META-INF/spring/spring-shell-plugin.xml
@@ -31,7 +31,6 @@
<!-- Define the shell state bean that will be shared across all of the commands. -->
<bean id="sharedShellState" class="org.apache.rya.shell.SharedShellState" />
<bean id="passwordPrompt" class="org.apache.rya.shell.util.PasswordPrompt.JLinePasswordPrompt" />
-<!-- <bean id="installPrompt" class="org.apache.rya.shell.util.InstallPrompt.JLineAccumuloInstallPrompt" /> -->
<bean id="installPrompt" class="org.apache.rya.shell.util.InstallPrompt.JLineInstallPropmpt" />
<bean id="uninstallPrompt" class="org.apache.rya.shell.util.UninstallPrompt.JLineUninstallPrompt" />
<bean id="sparqlPrompt" class="org.apache.rya.shell.util.SparqlPrompt.JLineSparqlPrompt" />
@@ -41,11 +40,11 @@
<bean id="ryaConnectionCommands" class="org.apache.rya.shell.RyaConnectionCommands" />
<bean id="ryaAdminCommands" class="org.apache.rya.shell.RyaAdminCommands" />
<bean id="ryaCommands" class="org.apache.rya.shell.RyaCommands" />
+ <bean id="ryaStreamsCommands" class="org.apache.rya.shell.RyaStreamsCommands" />
<!--
<bean id="springHelpCommands" class="org.springframework.shell.commands.HelpCommands" />
<bean id="springScriptCommands" class="org.springframework.shell.commands.ScriptCommands" />
<bean id="springExitCommands" class="org.springframework.shell.commands.ExitCommands" />
-->
-
</beans>
\ No newline at end of file
diff --git a/extras/shell/src/test/java/org/apache/rya/shell/RyaStreamsCommandsTest.java b/extras/shell/src/test/java/org/apache/rya/shell/RyaStreamsCommandsTest.java
new file mode 100644
index 0000000..9f5a794
--- /dev/null
+++ b/extras/shell/src/test/java/org/apache/rya/shell/RyaStreamsCommandsTest.java
@@ -0,0 +1,461 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.shell;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.rya.api.client.GetInstanceDetails;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.SetRyaStreamsConfiguration;
+import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
+import org.apache.rya.api.instance.RyaDetails;
+import org.apache.rya.api.instance.RyaDetails.RyaStreamsDetails;
+import org.apache.rya.shell.util.SparqlPrompt;
+import org.apache.rya.streams.api.RyaStreamsClient;
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.api.interactor.AddQuery;
+import org.apache.rya.streams.api.interactor.DeleteQuery;
+import org.apache.rya.streams.api.interactor.GetQuery;
+import org.apache.rya.streams.api.interactor.ListQueries;
+import org.apache.rya.streams.api.interactor.StartQuery;
+import org.apache.rya.streams.api.interactor.StopQuery;
+import org.junit.Test;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Sets;
+
+/**
+ * Unit tests the methods of {@link RyaStreamsCommands}.
+ */
+public class RyaStreamsCommandsTest {
+
+ @Test
+ public void configureRyaStreams() throws Exception {
+ // Mock the object that performs the configure operation.
+ final RyaClient mockCommands = mock(RyaClient.class);
+ final SetRyaStreamsConfiguration setStreams = mock(SetRyaStreamsConfiguration.class);
+ when(mockCommands.getSetRyaStreamsConfiguration()).thenReturn(setStreams);
+
+ // Mock a shell state and connect it to a Rya instance.
+ final SharedShellState state = new SharedShellState();
+ state.connectedToAccumulo(mock(AccumuloConnectionDetails.class), mockCommands);
+ state.connectedToInstance("unitTest");
+
+ // Verify that no Rya Streams Client is set to the state.
+ assertFalse(state.getShellState().getRyaStreamsCommands().isPresent());
+
+ try {
+ // Execute the command.
+ final RyaStreamsCommands commands = new RyaStreamsCommands(state, mock(SparqlPrompt.class));
+ final String message = commands.configureRyaStreams("localhost", 6);
+
+ // Verify the request was forwarded to the mocked interactor.
+ final RyaStreamsDetails expectedDetails = new RyaStreamsDetails("localhost", 6);
+ verify(setStreams).setRyaStreamsConfiguration(eq("unitTest"), eq(expectedDetails));
+
+ // Verify a RyaStreamsClient was created and added to the state.
+ assertTrue(state.getShellState().getRyaStreamsCommands().isPresent());
+
+ // Verify the correct message is reported.
+ final String expected = "The Rya Instance has been updated to use the provided Rya Streams subsystem. " +
+ "Rya Streams commands are now avaiable while connected to this instance.";
+ assertEquals(expected, message);
+ } finally {
+ state.getShellState().getRyaStreamsCommands().get().close();
+ }
+ }
+
+ @Test
+ public void printRyaStreamsDetails_noRyaDetails() throws Exception {
+ // Mock the object that performs the configure operation.
+ final RyaClient mockCommands = mock(RyaClient.class);
+ final GetInstanceDetails getDetails = mock(GetInstanceDetails.class);
+ when(mockCommands.getGetInstanceDetails()).thenReturn(getDetails);
+
+ // When getting the instance details, ensure they are not found.
+ when(getDetails.getDetails(eq("unitTest"))).thenReturn(Optional.absent());
+
+ // Mock a shell state and connect it to a Rya instance.
+ final SharedShellState state = new SharedShellState();
+ state.connectedToAccumulo(mock(AccumuloConnectionDetails.class), mockCommands);
+ state.connectedToInstance("unitTest");
+
+ // Execute the command.
+ final RyaStreamsCommands commands = new RyaStreamsCommands(state, mock(SparqlPrompt.class));
+ final String message = commands.printRyaStreamsDetails();
+ final String expected = "This instance does not have any Rya Details, so it is unable to be connected to the Rya Streams subsystem.";
+ assertEquals(expected, message);
+ }
+
+ @Test
+ public void printRyaStreamsDetails_notConfigured() throws Exception {
+ // Mock the object that performs the configure operation.
+ final RyaClient mockCommands = mock(RyaClient.class);
+ final GetInstanceDetails getDetails = mock(GetInstanceDetails.class);
+ when(mockCommands.getGetInstanceDetails()).thenReturn(getDetails);
+
+ // When getting the instance details, ensure they do not have RyaStreamsDetails to print.
+ final RyaDetails details = mock(RyaDetails.class);
+ when(details.getRyaStreamsDetails()).thenReturn(Optional.absent());
+ when(getDetails.getDetails(eq("unitTest"))).thenReturn(Optional.of(details));
+
+ // Mock a shell state and connect it to a Rya instance.
+ final SharedShellState state = new SharedShellState();
+ state.connectedToAccumulo(mock(AccumuloConnectionDetails.class), mockCommands);
+ state.connectedToInstance("unitTest");
+
+ // Execute the command.
+ final RyaStreamsCommands commands = new RyaStreamsCommands(state, mock(SparqlPrompt.class));
+ final String message = commands.printRyaStreamsDetails();
+ final String expected = "This instance of Rya has not been configured to use a Rya Streams subsystem.";
+ assertEquals(expected, message);
+ }
+
+ @Test
+ public void printRyaStreamsDetails_configured() throws Exception {
+ // Mock the object that performs the configure operation.
+ final RyaClient mockCommands = mock(RyaClient.class);
+ final GetInstanceDetails getDetails = mock(GetInstanceDetails.class);
+ when(mockCommands.getGetInstanceDetails()).thenReturn(getDetails);
+
+ // When getting the instance details, ensure they do have RyaStreamsDetails to print.
+ final RyaDetails details = mock(RyaDetails.class);
+ when(details.getRyaStreamsDetails()).thenReturn(Optional.of(new RyaStreamsDetails("localhost", 6)));
+ when(getDetails.getDetails(eq("unitTest"))).thenReturn(Optional.of(details));
+
+ // Mock a shell state and connect it to a Rya instance.
+ final SharedShellState state = new SharedShellState();
+ state.connectedToAccumulo(mock(AccumuloConnectionDetails.class), mockCommands);
+ state.connectedToInstance("unitTest");
+
+ // Execute the command.
+ final RyaStreamsCommands commands = new RyaStreamsCommands(state, mock(SparqlPrompt.class));
+ final String message = commands.printRyaStreamsDetails();
+ final String expected = "Kafka Hostname: localhost, Kafka Port: 6";
+ assertEquals(expected, message);
+ }
+
+ @Test
+ public void addQuery_userAbortsSparqlPrompt() throws Exception {
+ // Mock the object that performs the rya streams operation.
+ final RyaStreamsClient mockClient = mock(RyaStreamsClient.class);
+ final AddQuery addQuery = mock(AddQuery.class);
+ when(mockClient.getAddQuery()).thenReturn(addQuery);
+
+ // Mock a SPARQL prompt that a user aborts.
+ final SparqlPrompt prompt = mock(SparqlPrompt.class);
+ when(prompt.getSparql()).thenReturn(Optional.absent());
+
+ // Mock a shell state and connect it to a Rya instance.
+ final SharedShellState state = new SharedShellState();
+ state.connectedToAccumulo(mock(AccumuloConnectionDetails.class), mock(RyaClient.class));
+ state.connectedToInstance("unitTest");
+ state.connectedToRyaStreams(mockClient);
+
+ // Execute the command.
+ final RyaStreamsCommands commands = new RyaStreamsCommands(state, prompt);
+ final String message = commands.addQuery(false);
+
+ // Verify a message is printed to the user.
+ assertEquals("", message);
+ }
+
+ @Test
+ public void addQuery() throws Exception {
+ // Mock the object that performs the rya streams operation.
+ final RyaStreamsClient mockClient = mock(RyaStreamsClient.class);
+ final AddQuery addQuery = mock(AddQuery.class);
+ when(mockClient.getAddQuery()).thenReturn(addQuery);
+
+ final StreamsQuery addedQuery = new StreamsQuery(UUID.randomUUID(), "query", true);
+ when(addQuery.addQuery(eq("query"), eq(true))).thenReturn(addedQuery);
+
+ // Mock a SPARQL prompt that a user entered a query through.
+ final SparqlPrompt prompt = mock(SparqlPrompt.class);
+ when(prompt.getSparql()).thenReturn(Optional.of("query"));
+
+ // Mock a shell state and connect it to a Rya instance.
+ final SharedShellState state = new SharedShellState();
+ state.connectedToAccumulo(mock(AccumuloConnectionDetails.class), mock(RyaClient.class));
+ state.connectedToInstance("unitTest");
+ state.connectedToRyaStreams(mockClient);
+
+ // Execute the command.
+ final RyaStreamsCommands commands = new RyaStreamsCommands(state, prompt);
+ final String message = commands.addQuery(false);
+
+ // Verify the interactor was invoked with the provided input.
+ verify(addQuery).addQuery("query", true);
+
+ // Verify a message is printed to the user.
+ final String expected = "The added query's ID is " + addedQuery.getQueryId();
+ assertEquals(expected, message);
+ }
+
+ @Test
+ public void deleteQuery() throws Exception {
+ // Mock the object that performs the rya streams operation.
+ final RyaStreamsClient mockClient = mock(RyaStreamsClient.class);
+ final DeleteQuery deleteQuery = mock(DeleteQuery.class);
+ when(mockClient.getDeleteQuery()).thenReturn(deleteQuery);
+
+ // Mock a shell state and connect it to a Rya instance.
+ final SharedShellState state = new SharedShellState();
+ state.connectedToAccumulo(mock(AccumuloConnectionDetails.class), mock(RyaClient.class));
+ state.connectedToInstance("unitTest");
+ state.connectedToRyaStreams(mockClient);
+
+ // Execute the command.
+ final RyaStreamsCommands commands = new RyaStreamsCommands(state, mock(SparqlPrompt.class));
+ final UUID queryId = UUID.randomUUID();
+ final String message = commands.deleteQuery(queryId.toString());
+
+ // Verify the interactor was invoked with the provided parameters.
+ verify(deleteQuery).delete(eq(queryId));
+
+ // Verify a message is printed to the user.
+ final String expected = "The query has been deleted.";
+ assertEquals(expected, message);
+ }
+
+ @Test
+ public void startQuery() throws Exception {
+ // Mock the object that performs the rya streams operation.
+ final RyaStreamsClient mockClient = mock(RyaStreamsClient.class);
+ final StartQuery startQuery = mock(StartQuery.class);
+ when(mockClient.getStartQuery()).thenReturn(startQuery);
+ final GetQuery getQuery = mock(GetQuery.class);
+ when(mockClient.getGetQuery()).thenReturn(getQuery);
+
+ // Mock a shell state and connect it to a Rya instance.
+ final SharedShellState state = new SharedShellState();
+ state.connectedToAccumulo(mock(AccumuloConnectionDetails.class), mock(RyaClient.class));
+ state.connectedToInstance("unitTest");
+ state.connectedToRyaStreams(mockClient);
+
+ // Report the query as not running.
+ final UUID queryId = UUID.randomUUID();
+ when(getQuery.getQuery(eq(queryId))).thenReturn(java.util.Optional.of(new StreamsQuery(queryId, "sparql", false)));
+
+ // Execute the command.
+ final RyaStreamsCommands commands = new RyaStreamsCommands(state, mock(SparqlPrompt.class));
+ final String message = commands.startQuery(queryId.toString());
+
+ // Verify the interactor was invoked with the provided parameters.
+ verify(startQuery).start(queryId);
+
+ // Verify a message is printed to the user.
+ final String expected = "The query will be processed by the Rya Streams subsystem.";
+ assertEquals(expected, message);
+ }
+
+ @Test
+ public void startQuery_alreadyRunning() throws Exception{
+ // Mock the object that performs the rya streams operation.
+ final RyaStreamsClient mockClient = mock(RyaStreamsClient.class);
+ final StartQuery startQuery = mock(StartQuery.class);
+ when(mockClient.getStartQuery()).thenReturn(startQuery);
+ final GetQuery getQuery = mock(GetQuery.class);
+ when(mockClient.getGetQuery()).thenReturn(getQuery);
+
+ // Mock a shell state and connect it to a Rya instance.
+ final SharedShellState state = new SharedShellState();
+ state.connectedToAccumulo(mock(AccumuloConnectionDetails.class), mock(RyaClient.class));
+ state.connectedToInstance("unitTest");
+ state.connectedToRyaStreams(mockClient);
+
+ // Report the query as running.
+ final UUID queryId = UUID.randomUUID();
+ when(getQuery.getQuery(eq(queryId))).thenReturn(java.util.Optional.of(new StreamsQuery(queryId, "sparql", true)));
+
+ // Execute the command.
+ final RyaStreamsCommands commands = new RyaStreamsCommands(state, mock(SparqlPrompt.class));
+ final String message = commands.startQuery(queryId.toString());
+
+ // Verify the interactor was not invoked.
+ verify(startQuery, never()).start(queryId);
+
+ // Verify a message is printed to the user.
+ final String expected = "That query is already running.";
+ assertEquals(expected, message);
+ }
+
+ @Test
+ public void stopQuery() throws Exception {
+ // Mock the object that performs the rya streams operation.
+ final RyaStreamsClient mockClient = mock(RyaStreamsClient.class);
+ final StopQuery stopQuery = mock(StopQuery.class);
+ when(mockClient.getStopQuery()).thenReturn(stopQuery);
+ final GetQuery getQuery = mock(GetQuery.class);
+ when(mockClient.getGetQuery()).thenReturn(getQuery);
+
+ // Mock a shell state and connect it to a Rya instance.
+ final SharedShellState state = new SharedShellState();
+ state.connectedToAccumulo(mock(AccumuloConnectionDetails.class), mock(RyaClient.class));
+ state.connectedToInstance("unitTest");
+ state.connectedToRyaStreams(mockClient);
+
+ // Report the query as running.
+ final UUID queryId = UUID.randomUUID();
+ when(getQuery.getQuery(eq(queryId))).thenReturn(java.util.Optional.of(new StreamsQuery(queryId, "sparql", true)));
+
+ // Execute the command.
+ final RyaStreamsCommands commands = new RyaStreamsCommands(state, mock(SparqlPrompt.class));
+ final String message = commands.stopQuery(queryId.toString());
+
+ // Verify the interactor was invoked with the provided parameters.
+ verify(stopQuery).stop(queryId);
+
+ // Verify a message is printed to the user.
+ final String expected = "The query will no longer be processed by the Rya Streams subsystem.";
+ assertEquals(expected, message);
+ }
+
+ @Test
+ public void stopQuery_alreadyStopped() throws Exception {
+ // Mock the object that performs the rya streams operation.
+ final RyaStreamsClient mockClient = mock(RyaStreamsClient.class);
+ final StopQuery stopQuery = mock(StopQuery.class);
+ when(mockClient.getStopQuery()).thenReturn(stopQuery);
+ final GetQuery getQuery = mock(GetQuery.class);
+ when(mockClient.getGetQuery()).thenReturn(getQuery);
+
+ // Mock a shell state and connect it to a Rya instance.
+ final SharedShellState state = new SharedShellState();
+ state.connectedToAccumulo(mock(AccumuloConnectionDetails.class), mock(RyaClient.class));
+ state.connectedToInstance("unitTest");
+ state.connectedToRyaStreams(mockClient);
+
+ // Report the query as not running.
+ final UUID queryId = UUID.randomUUID();
+ when(getQuery.getQuery(eq(queryId))).thenReturn(java.util.Optional.of(new StreamsQuery(queryId, "sparql", false)));
+
+ // Execute the command.
+ final RyaStreamsCommands commands = new RyaStreamsCommands(state, mock(SparqlPrompt.class));
+ final String message = commands.stopQuery(queryId.toString());
+
+ // Verify the interactor was not invoked with the provided parameters.
+ verify(stopQuery, never()).stop(queryId);
+
+ // Verify a message is printed to the user.
+ final String expected = "That query is already stopped.";
+ assertEquals(expected, message);
+ }
+
+ @Test
+ public void listQueries() throws Exception {
+ // Mock the object that performs the rya streams operation.
+ final RyaStreamsClient mockClient = mock(RyaStreamsClient.class);
+ final ListQueries listQueries = mock(ListQueries.class);
+ when(mockClient.getListQueries()).thenReturn(listQueries);
+
+ final Set<StreamsQuery> queries = Sets.newHashSet(
+ new StreamsQuery(
+ UUID.fromString("33333333-3333-3333-3333-333333333333"),
+ "SELECT * WHERE { ?person <urn:worksAt> ?business . }",
+ true),
+ new StreamsQuery(
+ UUID.fromString("11111111-1111-1111-1111-111111111111"),
+ "SELECT * WHERE { ?a ?b ?c . }",
+ true),
+ new StreamsQuery(
+ UUID.fromString("22222222-2222-2222-2222-222222222222"),
+ "SELECT * WHERE { ?d ?e ?f . }",
+ false));
+ when(listQueries.all()).thenReturn(queries);
+
+ // Mock a shell state and connect it to a Rya instance.
+ final SharedShellState state = new SharedShellState();
+ state.connectedToAccumulo(mock(AccumuloConnectionDetails.class), mock(RyaClient.class));
+ state.connectedToInstance("unitTest");
+ state.connectedToRyaStreams(mockClient);
+
+ // Execute the command.
+ final RyaStreamsCommands commands = new RyaStreamsCommands(state, mock(SparqlPrompt.class));
+ final String message = commands.listQueries();
+
+ // Verify the correct report is returned.
+ final String expected =
+ "-----------------------------------------------\n" +
+ " Query ID: 11111111-1111-1111-1111-111111111111\n" +
+ "Is Active: true\n" +
+ " SPARQL: select ?a ?b ?c\n" +
+ " where {\n" +
+ " ?a ?b ?c.\n" +
+ " }\n" +
+ "-----------------------------------------------\n" +
+ " Query ID: 22222222-2222-2222-2222-222222222222\n" +
+ "Is Active: false\n" +
+ " SPARQL: select ?d ?e ?f\n" +
+ " where {\n" +
+ " ?d ?e ?f.\n" +
+ " }\n" +
+ "-----------------------------------------------\n" +
+ " Query ID: 33333333-3333-3333-3333-333333333333\n" +
+ "Is Active: true\n" +
+ " SPARQL: select ?person ?business\n" +
+ " where {\n" +
+ " ?person <urn:worksAt> ?business.\n" +
+ " }\n" +
+ "-----------------------------------------------\n";
+ assertEquals(expected, message);
+ }
+
+ @Test
+ public void printQueryDetails() throws Exception {
+ // Mock the object that performs the rya streams operation.
+ final RyaStreamsClient mockClient = mock(RyaStreamsClient.class);
+ final GetQuery getQuery = mock(GetQuery.class);
+ when(mockClient.getGetQuery()).thenReturn(getQuery);
+
+ final UUID queryId = UUID.fromString("da55cea5-c21c-46a5-ab79-5433eef4efaa");
+ final StreamsQuery query = new StreamsQuery(queryId, "SELECT * WHERE { ?a ?b ?c . }", true);
+ when(getQuery.getQuery(queryId)).thenReturn(java.util.Optional.of(query));
+
+ // Mock a shell state and connect it to a Rya instance.
+ final SharedShellState state = new SharedShellState();
+ state.connectedToAccumulo(mock(AccumuloConnectionDetails.class), mock(RyaClient.class));
+ state.connectedToInstance("unitTest");
+ state.connectedToRyaStreams(mockClient);
+
+ // Execute the command.
+ final RyaStreamsCommands commands = new RyaStreamsCommands(state, mock(SparqlPrompt.class));
+ final String message = commands.printQueryDetails(queryId.toString());
+
+ // Verify the correct report is returned.
+ final String expected =
+ " Query ID: da55cea5-c21c-46a5-ab79-5433eef4efaa\n" +
+ "Is Active: true\n" +
+ " SPARQL: select ?a ?b ?c\n" +
+ " where {\n" +
+ " ?a ?b ?c.\n" +
+ " }\n";
+ assertEquals(expected, message);
+ }
+}
\ No newline at end of file
diff --git a/extras/shell/src/test/java/org/apache/rya/shell/SharedShellStateTest.java b/extras/shell/src/test/java/org/apache/rya/shell/SharedShellStateTest.java
index b5f136c..e1ad039 100644
--- a/extras/shell/src/test/java/org/apache/rya/shell/SharedShellStateTest.java
+++ b/extras/shell/src/test/java/org/apache/rya/shell/SharedShellStateTest.java
@@ -25,6 +25,7 @@
import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
import org.apache.rya.shell.SharedShellState.ConnectionState;
import org.apache.rya.shell.SharedShellState.ShellState;
+import org.apache.rya.streams.api.RyaStreamsClient;
import org.junit.Test;
/**
@@ -163,4 +164,34 @@
.build();
assertEquals(expected, state.getShellState());
}
+
+ @Test(expected = IllegalStateException.class)
+ public void connectedToRyaStreams_notConnectedToInstance() throws Exception {
+ // Create a shell state that is not connected to an instance.
+ final SharedShellState state = new SharedShellState();
+
+ // Connecting to a Rya Streams system fails.
+ state.connectedToRyaStreams( mock(RyaStreamsClient.class) );
+ }
+
+ @Test
+ public void connectedToRyaStreams() {
+ // Create a shell state.
+ final SharedShellState state = new SharedShellState();
+
+ // Connect to Accumulo.
+ final AccumuloConnectionDetails connectionDetails = mock(AccumuloConnectionDetails.class);
+ final RyaClient connectedCommands = mock(RyaClient.class);
+ state.connectedToAccumulo(connectionDetails, connectedCommands);
+
+ // Connect to an Instance.
+ state.connectedToInstance("instance");
+
+ // Connect to Rya Streams for the instance.
+ final RyaStreamsClient streamsClient = mock(RyaStreamsClient.class);
+ state.connectedToRyaStreams(streamsClient);
+
+ // Verify the state.
+ assertEquals(streamsClient, state.getShellState().getRyaStreamsCommands().get());
+ }
}
\ No newline at end of file
diff --git a/extras/shell/src/test/java/org/apache/rya/shell/util/StreamsQueryFormatterTest.java b/extras/shell/src/test/java/org/apache/rya/shell/util/StreamsQueryFormatterTest.java
new file mode 100644
index 0000000..8e5d251
--- /dev/null
+++ b/extras/shell/src/test/java/org/apache/rya/shell/util/StreamsQueryFormatterTest.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.shell.util;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.junit.Test;
+
+import com.google.common.collect.Sets;
+
+/**
+ * Unit tests the methods of {@link StreamsQueryFormatter}.
+ */
+public class StreamsQueryFormatterTest {
+
+ @Test
+ public void formatQuery() throws Exception {
+ // Format the query.
+ final StreamsQuery query = new StreamsQuery(
+ UUID.fromString("da55cea5-c21c-46a5-ab79-5433eef4efaa"),
+ "SELECT * WHERE { ?a ?b ?c . }",
+ true);
+ final String formatted = StreamsQueryFormatter.format(query);
+
+ // Ensure it has the expected format.
+ final String expected =
+ " Query ID: da55cea5-c21c-46a5-ab79-5433eef4efaa\n" +
+ "Is Active: true\n" +
+ " SPARQL: select ?a ?b ?c\n" +
+ " where {\n" +
+ " ?a ?b ?c.\n" +
+ " }\n";
+
+ assertEquals(expected, formatted);
+ }
+
+ @Test
+ public void formatQueries() throws Exception {
+ // Format the queries.
+ final Set<StreamsQuery> queries = Sets.newHashSet(
+ new StreamsQuery(
+ UUID.fromString("33333333-3333-3333-3333-333333333333"),
+ "SELECT * WHERE { ?person <urn:worksAt> ?business . }",
+ true),
+ new StreamsQuery(
+ UUID.fromString("11111111-1111-1111-1111-111111111111"),
+ "SELECT * WHERE { ?a ?b ?c . }",
+ true),
+ new StreamsQuery(
+ UUID.fromString("22222222-2222-2222-2222-222222222222"),
+ "SELECT * WHERE { ?d ?e ?f . }",
+ false));
+
+ final String formatted = StreamsQueryFormatter.format(queries);
+
+ // Ensure it has the expected format.
+ final String expected =
+ "-----------------------------------------------\n" +
+ " Query ID: 11111111-1111-1111-1111-111111111111\n" +
+ "Is Active: true\n" +
+ " SPARQL: select ?a ?b ?c\n" +
+ " where {\n" +
+ " ?a ?b ?c.\n" +
+ " }\n" +
+ "-----------------------------------------------\n" +
+ " Query ID: 22222222-2222-2222-2222-222222222222\n" +
+ "Is Active: false\n" +
+ " SPARQL: select ?d ?e ?f\n" +
+ " where {\n" +
+ " ?d ?e ?f.\n" +
+ " }\n" +
+ "-----------------------------------------------\n" +
+ " Query ID: 33333333-3333-3333-3333-333333333333\n" +
+ "Is Active: true\n" +
+ " SPARQL: select ?person ?business\n" +
+ " where {\n" +
+ " ?person <urn:worksAt> ?business.\n" +
+ " }\n" +
+ "-----------------------------------------------\n";
+ assertEquals(expected, formatted);
+ }
+}
\ No newline at end of file
diff --git a/extras/shell/src/test/resources/RyaShellTest-context.xml b/extras/shell/src/test/resources/RyaShellTest-context.xml
index f7ffe0f..54c44cf 100644
--- a/extras/shell/src/test/resources/RyaShellTest-context.xml
+++ b/extras/shell/src/test/resources/RyaShellTest-context.xml
@@ -60,4 +60,5 @@
<bean id="ryaConnectionCommands" class="org.apache.rya.shell.RyaConnectionCommands" />
<bean id="ryaCommands" class="org.apache.rya.shell.RyaCommands" />
<bean id="ryaAdminCommands" class="org.apache.rya.shell.RyaAdminCommands" />
+ <bean id="ryaStreamsCommands" class="org.apache.rya.shell.RyaStreamsCommands" />
</beans>
\ No newline at end of file