blob: fede1a98acd1a7dbdf0d58b28f99dc2f950f5606 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.rya.shell;
import static java.util.Objects.requireNonNull;
import java.io.IOException;
import java.util.Set;
import java.util.UUID;
import org.apache.rya.api.client.RyaClient;
import org.apache.rya.api.client.RyaClientException;
import org.apache.rya.api.instance.RyaDetails;
import org.apache.rya.api.instance.RyaDetails.RyaStreamsDetails;
import org.apache.rya.api.utils.QueryInvestigator;
import org.apache.rya.shell.SharedShellState.ConnectionState;
import org.apache.rya.shell.util.ConsolePrinter;
import org.apache.rya.shell.util.SparqlPrompt;
import org.apache.rya.shell.util.StreamsQueryFormatter;
import org.apache.rya.streams.api.RyaStreamsClient;
import org.apache.rya.streams.api.entity.StreamsQuery;
import org.apache.rya.streams.api.exception.RyaStreamsException;
import org.apache.rya.streams.kafka.KafkaRyaStreamsClientFactory;
import org.openrdf.query.MalformedQueryException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.shell.core.CommandMarker;
import org.springframework.shell.core.annotation.CliAvailabilityIndicator;
import org.springframework.shell.core.annotation.CliCommand;
import org.springframework.shell.core.annotation.CliOption;
import org.springframework.stereotype.Component;
import com.google.common.base.Optional;
/**
* Rya Shell commands used to interact with the Rya Streams subsystem.
*/
@Component
public class RyaStreamsCommands implements CommandMarker {
public static final String STREAMS_CONFIGURE_CMD = "streams-configure";
public static final String STREAMS_DETAILS_CMD = "streams-details";
public static final String STREAM_QUERIES_ADD_CMD = "streams-queries-add";
public static final String STREAM_QUERIES_DELETE_CMD = "streams-queries-delete";
public static final String STREAM_QUERIES_START_CMD = "streams-queries-start";
public static final String STREAM_QUERIES_STOP_CMD = "streams-queries-stop";
public static final String STREAM_QUERIES_LIST_CMD = "streams-queries-list";
public static final String STREAM_QUERIES_DETAILS_CMD = "streams-queries-details";
private final SharedShellState state;
private final SparqlPrompt sparqlPrompt;
private final ConsolePrinter consolePrinter;
/**
* Constructs an instance of {@link RyaStreamsCommands}.
*
* @param state - Holds shared state between all of the command classes. (not null)
* @param sparqlPrompt - Prompts a user for a SPARQL query. (not null)
* @param consolePrinter - Prints messages to the console. (not null)
*/
@Autowired
public RyaStreamsCommands(
final SharedShellState state,
final SparqlPrompt sparqlPrompt,
final ConsolePrinter consolePrinter) {
this.state = requireNonNull(state);
this.sparqlPrompt = requireNonNull(sparqlPrompt);
this.consolePrinter = requireNonNull(consolePrinter);
}
/**
* Enables commands that become available when connected to a Rya instance.
*/
@CliAvailabilityIndicator({
STREAMS_CONFIGURE_CMD,
STREAMS_DETAILS_CMD})
public boolean areConfigCommandsAvailable() {
return state.getShellState().getConnectionState() == ConnectionState.CONNECTED_TO_INSTANCE;
}
/**
* Enables commands that become available when a Rya instance has a configured Rya Streams subsystem to use.
*/
@CliAvailabilityIndicator({
STREAM_QUERIES_ADD_CMD,
STREAM_QUERIES_DELETE_CMD,
STREAM_QUERIES_START_CMD,
STREAM_QUERIES_STOP_CMD,
STREAM_QUERIES_LIST_CMD,
STREAM_QUERIES_DETAILS_CMD})
public boolean areQueriesCommandsAvailable() {
return state.getShellState().getRyaStreamsCommands().isPresent();
}
@CliCommand(value = STREAMS_CONFIGURE_CMD, help = "Connect a Rya Streams subsystem to a Rya Instance.")
public String configureRyaStreams(
@CliOption(key = {"kafkaHostname"}, mandatory = true, help = "The hostname of the Kafka Broker.")
final String kafkaHostname,
@CliOption(key = {"kafkaPort"}, mandatory = true, help = "The port of the Kafka Broker.")
final int kafkaPort) {
// If this instance was connected to a different Rya Streams subsystem, then close that client.
final Optional<RyaStreamsClient> oldClient = state.getShellState().getRyaStreamsCommands();
if(oldClient.isPresent()) {
try {
oldClient.get().close();
} catch (final Exception e) {
System.err.print("Warning: Could not close the old Rya Streams Client.");
e.printStackTrace();
}
}
// Update the Rya Details for the connected Rya Instance.
final String ryaInstance = state.getShellState().getRyaInstanceName().get();
final RyaClient ryaClient = state.getShellState().getConnectedCommands().get();
try {
final RyaStreamsDetails streamsDetails = new RyaStreamsDetails(kafkaHostname, kafkaPort);
ryaClient.getSetRyaStreamsConfiguration().setRyaStreamsConfiguration(ryaInstance, streamsDetails);
} catch (final RyaClientException e) {
throw new RuntimeException("Could not update the Rya instance's Rya Details to include the new " +
"information. This command failed to complete.", e);
}
// Connect a Rya Streams Client and set it in the shared state.
final RyaStreamsClient newClient = KafkaRyaStreamsClientFactory.make(ryaInstance, kafkaHostname, kafkaPort);
state.connectedToRyaStreams(newClient);
// Return a message that indicates the operation was successful.
if(oldClient.isPresent()) {
return "The Rya Streams subsystem that this Rya instance uses has been changed. Any queries that were " +
"maintained by the previous subsystem will need to be migrated to the new one.";
} else {
return "The Rya Instance has been updated to use the provided Rya Streams subsystem. " +
"Rya Streams commands are now avaiable while connected to this instance.";
}
}
@CliCommand(value = STREAMS_DETAILS_CMD, help = "Print information about which Rya Streams subsystem the Rya instance is connected to.")
public String printRyaStreamsDetails() {
final String ryaInstance = state.getShellState().getRyaInstanceName().get();
final RyaClient client = state.getShellState().getConnectedCommands().get();
try {
// Handle the case where the instance does not have Rya Details.
final Optional<RyaDetails> details = client.getGetInstanceDetails().getDetails(ryaInstance);
if(!details.isPresent()) {
return "This instance does not have any Rya Details, so it is unable to be connected to the Rya Streams subsystem.";
}
// Print a message based on if the instance is connected to Rya Streams.
final Optional<RyaStreamsDetails> streamsDetails = details.get().getRyaStreamsDetails();
if(!streamsDetails.isPresent()) {
return "This instance of Rya has not been configured to use a Rya Streams subsystem.";
}
// Print the details about which Rya Streams subsystem is being used.
return "Kafka Hostname: " + streamsDetails.get().getHostname() + ", Kafka Port: " + streamsDetails.get().getPort();
} catch (final RyaClientException e) {
throw new RuntimeException("Could not fetch the Rya Details for this Rya instance.", e);
}
}
@CliCommand(value = STREAM_QUERIES_ADD_CMD, help = "Add a SPARQL query to the Rya Streams subsystem.")
public String addQuery(
@CliOption(key = {"inactive"}, mandatory = false, unspecifiedDefaultValue = "false", specifiedDefaultValue = "true",
help = "Setting this flag will add the query, but not run it. (default: false)")
final boolean inactive,
@CliOption(key = {"insert"}, mandatory = false, unspecifiedDefaultValue = "false", specifiedDefaultValue = "true",
help = "Setting this flag will insert the query's results back into Rya. (default: false)")
final boolean isInsert) {
final RyaStreamsClient streamsClient = state.getShellState().getRyaStreamsCommands().get();
// Prompt the user for the SPARQL that defines the query.
try {
final Optional<String> sparql = sparqlPrompt.getSparql();
// If the user aborted the prompt, return.
if(!sparql.isPresent()) {
return "";
}
final boolean isConstructQuery = QueryInvestigator.isConstruct(sparql.get());
final boolean isInsertQuery = QueryInvestigator.isInsertWhere(sparql.get());
// If the user wants to insert a CONSTRUCT into Rya, print a warning.
if(isInsert && isConstructQuery) {
consolePrinter.println("WARNING: CONSTRUCT is part of the SPARQL Query API, so they do not normally");
consolePrinter.println("get written back to the triple store. Consider using an INSERT, which is");
consolePrinter.println("part of the SPARQL Update API, in the future.");
}
// If the user wants to use an INSERT query, but not insert it back into Rya, suggest using a construct.
if(!isInsert && isInsertQuery) {
consolePrinter.println("WARNING: INSERT is part of the SPARQL Update API, so they normally get written");
consolePrinter.println("back to the triple store. Consider using a CONSTRUCT, which is part of the");
consolePrinter.println("SPARQL Query API, in the future.");
}
// If the user wants to insert the query back into Rya, make sure it is a legal query to do that.
if(isInsert && !(isConstructQuery || isInsertQuery)) {
throw new RuntimeException("Only CONSTRUCT queries and INSERT updates may be inserted back to the triple store.");
}
final StreamsQuery streamsQuery = streamsClient.getAddQuery().addQuery(sparql.get(), !inactive, isInsert);
return "The added query's ID is " + streamsQuery.getQueryId();
} catch (final MalformedQueryException | IOException | RyaStreamsException e) {
throw new RuntimeException("Unable to add the SPARQL query to the Rya Streams subsystem.", e);
}
}
@CliCommand(value = STREAM_QUERIES_DELETE_CMD, help = "Delete a SPARQL query from the Rya Streams subsystem.")
public String deleteQuery(
@CliOption(key= {"queryId"}, mandatory = true, help = "The ID of the query to remove.")
final String queryId) {
final RyaStreamsClient streamsClient = state.getShellState().getRyaStreamsCommands().get();
final UUID id = UUID.fromString(queryId);
try {
streamsClient.getDeleteQuery().delete(id);
} catch (final RyaStreamsException e) {
throw new RuntimeException("Could not delete the query from the Rya Streams subsystem.", e);
}
return "The query has been deleted.";
}
@CliCommand(value = STREAM_QUERIES_START_CMD, help = "Start processing a SPARQL query using the Rya Streams subsystem.")
public String startQuery(
@CliOption(key= {"queryId"}, mandatory = true, help = "The ID of the query to start processing.")
final String queryId) {
final RyaStreamsClient streamsClient = state.getShellState().getRyaStreamsCommands().get();
try {
// Ensure the query exists.
final UUID id = UUID.fromString(queryId);
final java.util.Optional<StreamsQuery> streamsQuery = streamsClient.getGetQuery().getQuery(id);
if(!streamsQuery.isPresent()) {
throw new RuntimeException("No Rya Streams query exists for ID " + queryId);
}
// Ensure it isn't already started.
if(streamsQuery.get().isActive()) {
return "That query is already running.";
}
// Start it.
streamsClient.getStartQuery().start(id);
return "The query will be processed by the Rya Streams subsystem.";
} catch (final RyaStreamsException e) {
throw new RuntimeException("Unable to start the Query.", e);
}
}
@CliCommand(value = STREAM_QUERIES_STOP_CMD, help = "Stop processing a SPARQL query using the Rya Streams subsystem.")
public String stopQuery(
@CliOption(key= {"queryId"}, mandatory = true, help = "The ID of the query to stop processing.")
final String queryId) {
final RyaStreamsClient streamsClient = state.getShellState().getRyaStreamsCommands().get();
try {
// Ensure the query exists.
final UUID id = UUID.fromString(queryId);
final java.util.Optional<StreamsQuery> streamsQuery = streamsClient.getGetQuery().getQuery(id);
if(!streamsQuery.isPresent()) {
throw new RuntimeException("No Rya Streams query exists for ID " + queryId);
}
// Ensure it isn't already stopped.
if(!streamsQuery.get().isActive()) {
return "That query is already stopped.";
}
// Stop it.
streamsClient.getStopQuery().stop(id);
return "The query will no longer be processed by the Rya Streams subsystem.";
} catch (final RyaStreamsException e) {
throw new RuntimeException("Unable to start the Query.", e);
}
}
@CliCommand(value = STREAM_QUERIES_LIST_CMD, help = "List the queries that are being managed by the configured Rya Streams subsystem.")
public String listQueries() {
final RyaStreamsClient streamsClient = state.getShellState().getRyaStreamsCommands().get();
try {
final Set<StreamsQuery> queries = streamsClient.getListQueries().all();
return StreamsQueryFormatter.format(queries);
} catch (final RyaStreamsException e) {
throw new RuntimeException("Unable to fetch the queries from the Rya Streams subsystem.", e);
} catch (final Exception e) {
throw new RuntimeException("Unable to print the query to the console.", e);
}
}
@CliCommand(value = STREAM_QUERIES_DETAILS_CMD, help = "Print detailed information about a specific query managed by the Rya Streams subsystem.")
public String printQueryDetails(
@CliOption(key= {"queryId"}, mandatory = true, help = "The ID of the query whose details will be printed.")
final String queryId) {
final RyaStreamsClient streamsClient = state.getShellState().getRyaStreamsCommands().get();
final UUID id = UUID.fromString(queryId);
try {
final java.util.Optional<StreamsQuery> query = streamsClient.getGetQuery().getQuery(id);
if(!query.isPresent()) {
return "There is no query with the specified ID.";
}
return StreamsQueryFormatter.format(query.get());
} catch (final RyaStreamsException e) {
throw new RuntimeException("Unable to fetch the query from the Rya Streams subsystem.", e);
} catch (final Exception e) {
throw new RuntimeException("Unable to print the query to the console.", e);
}
}
}