RYA-466 Update the Rya Streams Client to stream results to file.
diff --git a/common/rya.api/src/main/java/org/apache/rya/api/utils/QueryInvestigator.java b/common/rya.api/src/main/java/org/apache/rya/api/utils/QueryInvestigator.java
new file mode 100644
index 0000000..2fbd09b
--- /dev/null
+++ b/common/rya.api/src/main/java/org/apache/rya/api/utils/QueryInvestigator.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.utils;
+
+import static java.util.Objects.requireNonNull;
+
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.parser.ParsedGraphQuery;
+import org.openrdf.query.parser.ParsedQuery;
+import org.openrdf.query.parser.QueryParserUtil;
+import org.openrdf.query.parser.sparql.SPARQLParser;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A utility class that is used to glean insight into the structure of SPARQL queries.
+ */
+@DefaultAnnotation(NonNull.class)
+public class QueryInvestigator {
+
+ private static final SPARQLParser PARSER = new SPARQLParser();
+
+ private QueryInvestigator() { }
+
+ /**
+ * Determines whether a SPARQL command is a CONSTRUCT or not.
+ *
+ * @param sparql - The SPARQL to evaluate. (not null)
+ * @return {@code true} if the provided SPARQL is a CONSTRUCT query; otherwise {@code false}.
+ * @throws MalformedQueryException The SPARQL is neither a well formed query or update.
+ */
+ public static boolean isConstruct(final String sparql) throws MalformedQueryException {
+ requireNonNull(sparql);
+
+ try {
+ // Constructs are queries, so try to create a ParsedQuery.
+ final ParsedQuery parsedQuery = PARSER.parseQuery(sparql, null);
+
+ // Check to see if the SPARQL looks like a CONSTRUCT query.
+ return parsedQuery instanceof ParsedGraphQuery;
+
+ } catch(final MalformedQueryException queryE) {
+ try {
+ // Maybe it's an update.
+ PARSER.parseUpdate(sparql, null);
+
+ // It was, so return false.
+ return false;
+
+ } catch(final MalformedQueryException updateE) {
+ // It's not. Actually malformed.
+ throw queryE;
+ }
+ }
+ }
+
+ /**
+ * Determines whether a SPARQL command is an INSERT with a WHERE clause or not.
+ *
+ * @param sparql - The SPARQL to evaluate. (not null)
+ * @return {@code true} if the provided SPARQL is an INSERT update; otherwise {@code false}.
+ * @throws MalformedQueryException The SPARQL is neither a well formed query or update.
+ */
+ public static boolean isInsertWhere(final String sparql) throws MalformedQueryException {
+ requireNonNull(sparql);
+
+ try {
+ // Inserts are updated, so try to create a ParsedUpdate.
+ PARSER.parseUpdate(sparql, null);
+ final String strippedOperation = QueryParserUtil.removeSPARQLQueryProlog(sparql.toLowerCase());
+ return strippedOperation.startsWith("insert");
+ } catch(final MalformedQueryException updateE) {
+ try {
+ // Maybe it's a query.
+ PARSER.parseQuery(sparql, null);
+
+ // It was, so return false.
+ return false;
+
+ } catch(final MalformedQueryException queryE) {
+ // It's not. Actually malformed.
+ throw updateE;
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/common/rya.api/src/test/java/org/apache/rya/api/utils/QueryInvestigatorTest.java b/common/rya.api/src/test/java/org/apache/rya/api/utils/QueryInvestigatorTest.java
new file mode 100644
index 0000000..fbe2ebe
--- /dev/null
+++ b/common/rya.api/src/test/java/org/apache/rya/api/utils/QueryInvestigatorTest.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.utils;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.Test;
+import org.openrdf.query.MalformedQueryException;
+
+/**
+ * Unit tests the methods of {@link QueryInvestigator}.
+ */
+public class QueryInvestigatorTest {
+
+ @Test
+ public void isConstruct_true() throws Exception {
+ final String sparql =
+ "PREFIX vCard: <http://www.w3.org/2001/vcard-rdf/3.0#> " +
+ "PREFIX foaf: <http://xmlns.com/foaf/0.1/> " +
+ "CONSTRUCT { " +
+ "?X vCard:FN ?name . " +
+ "?X vCard:URL ?url . " +
+ "?X vCard:TITLE ?title . " +
+ "} " +
+ "FROM <http://www.w3.org/People/Berners-Lee/card> " +
+ "WHERE { " +
+ "OPTIONAL { ?X foaf:name ?name . FILTER isLiteral(?name) . } " +
+ "OPTIONAL { ?X foaf:homepage ?url . FILTER isURI(?url) . } " +
+ "OPTIONAL { ?X foaf:title ?title . FILTER isLiteral(?title) . } " +
+ "}";
+
+ assertTrue( QueryInvestigator.isConstruct(sparql) );
+ }
+
+ @Test
+ public void isConstruct_false_notAConstruct() throws Exception {
+ final String sparql = "SELECT * WHERE { ?a ?b ?c . }";
+ assertFalse( QueryInvestigator.isConstruct(sparql) );
+ }
+
+ @Test
+ public void isConstruct_false_notAConstructWithKeywords() throws Exception {
+ final String sparql =
+ "SELECT ?construct " +
+ "WHERE {" +
+ " ?construct <urn:built> <urn:skyscraper> ." +
+ "}";
+ assertFalse( QueryInvestigator.isConstruct(sparql) );
+ }
+
+ @Test
+ public void isConstruct_false_notAQuery() throws Exception {
+ final String sparql =
+ "PREFIX Sensor: <http://example.com/Equipment.owl#> " +
+ "INSERT { " +
+ "?subject Sensor:test2 ?newValue " +
+ "} WHERE {" +
+ "values (?oldValue ?newValue) {" +
+ "('testValue1' 'newValue1')" +
+ "('testValue2' 'newValue2')" +
+ "}" +
+ "?subject Sensor:test1 ?oldValue" +
+ "}";
+
+ assertFalse( QueryInvestigator.isConstruct(sparql) );
+ }
+
+ @Test(expected = MalformedQueryException.class)
+ public void isConstruct_false_malformed() throws MalformedQueryException {
+ assertFalse( QueryInvestigator.isConstruct("not sparql") );
+ }
+
+ @Test
+ public void isInsert_true() throws Exception {
+ final String sparql =
+ "PREFIX Sensor: <http://example.com/Equipment.owl#> " +
+ "INSERT { " +
+ "?subject Sensor:test2 ?newValue " +
+ "} WHERE {" +
+ "values (?oldValue ?newValue) {" +
+ "('testValue1' 'newValue1')" +
+ "('testValue2' 'newValue2')" +
+ "}" +
+ "?subject Sensor:test1 ?oldValue" +
+ "}";
+
+ assertTrue( QueryInvestigator.isInsertWhere(sparql) );
+ }
+
+ @Test
+ public void isInsert_false_notAnInsert() throws Exception {
+ final String sparql =
+ "PREFIX dc: <http://purl.org/dc/elements/1.1/> " +
+ "PREFIX xsd: <http://www.w3.org/2001/XMLSchema#> " +
+ "DELETE { " +
+ "?book ?p ?v " +
+ "} WHERE { " +
+ "?book dc:date ?date . " +
+ "FILTER ( ?date < \"2000-01-01T00:00:00\"^^xsd:dateTime ) " +
+ "?book ?p ?v " +
+ "}";
+
+ assertFalse( QueryInvestigator.isInsertWhere(sparql) );
+ }
+
+ @Test
+ public void isInsert_false_notAnInsertWithKeywords() throws Exception {
+ final String sparql =
+ "DELETE" +
+ "{ " +
+ " ?bookInsert ?p ?o" +
+ "}" +
+ "WHERE" +
+ "{ " +
+ " ?bookInsert <urn:datePrinted> ?datePrinted ." +
+ " FILTER ( ?datePrinted < \"2018-01-01T00:00:00\"^^xsd:dateTime )" +
+ " ?bookInsert ?p ?o" +
+ "}";
+
+ assertFalse( QueryInvestigator.isInsertWhere(sparql) );
+ }
+
+ @Test
+ public void isInsert_false_notAnUpdate() throws Exception {
+ final String sparql = "SELECT * WHERE { ?a ?b ?c . }";
+ assertFalse( QueryInvestigator.isInsertWhere(sparql) );
+ }
+
+ @Test(expected = MalformedQueryException.class)
+ public void isInsert_false_malformed() throws MalformedQueryException {
+ assertFalse( QueryInvestigator.isInsertWhere("not sparql") );
+ }
+}
\ No newline at end of file
diff --git a/extras/rya.streams/client/pom.xml b/extras/rya.streams/client/pom.xml
index 5bd7933..38c3c86 100644
--- a/extras/rya.streams/client/pom.xml
+++ b/extras/rya.streams/client/pom.xml
@@ -69,6 +69,10 @@
<groupId>org.openrdf.sesame</groupId>
<artifactId>sesame-rio-turtle</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.openrdf.sesame</groupId>
+ <artifactId>sesame-queryresultio-sparqljson</artifactId>
+ </dependency>
<!-- Third Party dependencies -->
<dependency>
@@ -92,6 +96,11 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.rya</groupId>
<artifactId>rya.test.kafka</artifactId>
<scope>test</scope>
diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java
index 3612dd0..8ae4e08 100644
--- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java
+++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -20,11 +20,18 @@
import static java.util.Objects.requireNonNull;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.api.model.VisibilityStatement;
+import org.apache.rya.api.utils.QueryInvestigator;
import org.apache.rya.streams.api.entity.QueryResultStream;
import org.apache.rya.streams.api.entity.StreamsQuery;
import org.apache.rya.streams.api.interactor.GetQueryResultStream;
@@ -32,13 +39,13 @@
import org.apache.rya.streams.api.queries.QueryChangeLog;
import org.apache.rya.streams.api.queries.QueryRepository;
import org.apache.rya.streams.client.RyaStreamsCommand;
+import org.apache.rya.streams.client.util.QueryResultsOutputUtil;
import org.apache.rya.streams.kafka.KafkaTopics;
import org.apache.rya.streams.kafka.interactor.KafkaGetQueryResultStream;
import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLogFactory;
import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer;
import org.openrdf.query.MalformedQueryException;
-import org.openrdf.query.algebra.Reduced;
import org.openrdf.query.algebra.TupleExpr;
import org.openrdf.query.parser.sparql.SPARQLParser;
@@ -65,14 +72,20 @@
@Parameter(names = {"--queryId", "-q"}, required = true, description = "The query whose results will be streamed to the console.")
private String queryId;
+ @Parameter(names = {"--file", "-f"}, required = false, description = "If provided, the output file the results will stream into.")
+ private String outputPath;
+
@Override
public String toString() {
final StringBuilder parameters = new StringBuilder();
parameters.append(super.toString());
if (!Strings.isNullOrEmpty(queryId)) {
- parameters.append("\tQuery ID: " + queryId);
- parameters.append("\n");
+ parameters.append("\tQuery ID: " + queryId + "\n");
+ }
+
+ if(!Strings.isNullOrEmpty(outputPath)) {
+ parameters.append("\tOutput Path: " + outputPath + "\n");
}
return parameters.toString();
@@ -160,10 +173,12 @@
});
// Build the interactor based on the type of result the query produces.
+ boolean isStatementResults = false;
+
final GetQueryResultStream<?> getQueryResultStream;
try {
- final TupleExpr tupleExpr = new SPARQLParser().parseQuery(sparql, null).getTupleExpr();
- if(tupleExpr instanceof Reduced) {
+ isStatementResults = QueryInvestigator.isConstruct(sparql) | QueryInvestigator.isInsertWhere(sparql);
+ if(isStatementResults) {
getQueryResultStream = new KafkaGetQueryResultStream<>(params.kafkaIP, params.kafkaPort, VisibilityStatementDeserializer.class);
} else {
getQueryResultStream = new KafkaGetQueryResultStream<>(params.kafkaIP, params.kafkaPort, VisibilityBindingSetDeserializer.class);
@@ -172,12 +187,22 @@
throw new ExecutionException("Could not parse the SPARQL for the query: " + sparql, e);
}
- // Iterate through the results and print them to the console until the program or the stream ends.
- try (final QueryResultStream<?> stream = getQueryResultStream.fromStart(queryId)) {
- while(!finished.get()) {
- for(final Object result : stream.poll(1000)) {
- System.out.println(result);
+ // Iterate through the results and print them to the configured output mechanism.
+ try (final QueryResultStream<?> resultsStream = getQueryResultStream.fromStart(queryId)) {
+ final TupleExpr tupleExpr = new SPARQLParser().parseQuery(sparql, null).getTupleExpr();
+ if(params.outputPath != null) {
+ final Path file = Paths.get(params.outputPath);
+ try (final OutputStream out = Files.newOutputStream(file)) {
+ if(isStatementResults) {
+ final QueryResultStream<VisibilityStatement> stmtStream = (QueryResultStream<VisibilityStatement>) resultsStream;
+ QueryResultsOutputUtil.toNtriplesFile(out, stmtStream, finished);
+ } else {
+ final QueryResultStream<VisibilityBindingSet> bsStream = (QueryResultStream<VisibilityBindingSet>) resultsStream;
+ QueryResultsOutputUtil.toBindingSetJSONFile(out, tupleExpr, bsStream, finished);
+ }
}
+ } else {
+ streamToSystemOut(resultsStream, finished);
}
} catch (final Exception e) {
System.err.println("Error while reading the results from the stream.");
@@ -185,4 +210,15 @@
System.exit(1);
}
}
+
+ private static void streamToSystemOut(final QueryResultStream<?> stream, final AtomicBoolean shutdownSignal) throws Exception {
+ requireNonNull(stream);
+ requireNonNull(shutdownSignal);
+
+ while(!shutdownSignal.get()) {
+ for(final Object result : stream.poll(1000)) {
+ System.out.println(result);
+ }
+ }
+ }
}
\ No newline at end of file
diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/util/QueryResultsOutputUtil.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/util/QueryResultsOutputUtil.java
new file mode 100644
index 0000000..114a6fe
--- /dev/null
+++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/util/QueryResultsOutputUtil.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.client.util;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.OutputStream;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.api.model.VisibilityStatement;
+import org.apache.rya.streams.api.entity.QueryResultStream;
+import org.apache.rya.streams.api.exception.RyaStreamsException;
+import org.openrdf.query.TupleQueryResultHandlerException;
+import org.openrdf.query.algebra.TupleExpr;
+import org.openrdf.query.resultio.sparqljson.SPARQLResultsJSONWriter;
+import org.openrdf.rio.RDFFormat;
+import org.openrdf.rio.RDFHandlerException;
+import org.openrdf.rio.RDFWriter;
+import org.openrdf.rio.Rio;
+import org.openrdf.rio.WriterConfig;
+import org.openrdf.rio.helpers.BasicWriterSettings;
+
+import com.google.common.collect.Lists;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A utility that writes {@link QueryResultStream} results to an {@link OutputStream}.
+ */
+@DefaultAnnotation(NonNull.class)
+public class QueryResultsOutputUtil {
+
+ /**
+ * Private constructor to prevent instantiation.
+ */
+ private QueryResultsOutputUtil() { }
+
+ /**
+ * Writes the results of a {@link QueryResultStream} to the output stream as NTriples until the
+ * shutdown signal is set.
+ *
+ * @param out - The stream the NTriples data will be written to. (not null)
+ * @param resultsStream - The results stream that will be polled for results to
+ * write to {@code out}. (not null)
+ * @param shutdownSignal - Setting this signal will cause the thread that
+ * is processing this function to finish and leave. (not null)
+ * @throws RDFHandlerException A problem was encountered while
+ * writing the NTriples to the output stream.
+ * @throws IllegalStateException The {@code resultsStream} is closed.
+ * @throws RyaStreamsException Could not fetch the next set of results.
+ */
+ public static void toNtriplesFile(
+ final OutputStream out,
+ final QueryResultStream<VisibilityStatement> resultsStream,
+ final AtomicBoolean shutdownSignal) throws RDFHandlerException, IllegalStateException, RyaStreamsException {
+ requireNonNull(out);
+ requireNonNull(resultsStream);
+ requireNonNull(shutdownSignal);
+
+ final RDFWriter writer = Rio.createWriter(RDFFormat.NTRIPLES, out);
+ writer.startRDF();
+
+ while(!shutdownSignal.get()) {
+ final Iterable<VisibilityStatement> it = resultsStream.poll(1000);
+ for(final VisibilityStatement result : it) {
+ writer.handleStatement(result);
+ }
+ }
+
+ writer.endRDF();
+ }
+
+ /**
+ * Writes the results of a {@link QueryResultStream} to the output stream as JSON until the
+ * shutdown signal is set.
+ *
+ * @param out - The stream the JSON will be written to. (not null)
+ * @param query - The parsed SPARQL Query whose results are being output. This
+ * object is used to figure out which bindings may appear. (not null)
+ * @param resultsStream - The results stream that will be polled for results to
+ * write to {@code out}. (not null)
+ * @param shutdownSignal - Setting this signal will cause the thread that
+ * is processing this function to finish and leave. (not null)
+ * @throws TupleQueryResultHandlerException A problem was encountered while
+ * writing the JSON to the output stream.
+ * @throws IllegalStateException The {@code resultsStream} is closed.
+ * @throws RyaStreamsException Could not fetch the next set of results.
+ */
+ public static void toBindingSetJSONFile(
+ final OutputStream out,
+ final TupleExpr query,
+ final QueryResultStream<VisibilityBindingSet> resultsStream,
+ final AtomicBoolean shutdownSignal) throws TupleQueryResultHandlerException, IllegalStateException, RyaStreamsException {
+ requireNonNull(out);
+ requireNonNull(query);
+ requireNonNull(resultsStream);
+ requireNonNull(shutdownSignal);
+
+ // Create a writer that does not pretty print.
+ final SPARQLResultsJSONWriter writer = new SPARQLResultsJSONWriter(out);
+ final WriterConfig config = writer.getWriterConfig();
+ config.set(BasicWriterSettings.PRETTY_PRINT, false);
+
+ // Start the JSON and enumerate the possible binding names.
+ writer.startQueryResult( Lists.newArrayList(query.getBindingNames()) );
+
+ while(!shutdownSignal.get()) {
+ for(final VisibilityBindingSet result : resultsStream.poll(1000)) {
+ writer.handleSolution(result);
+ }
+ }
+
+ writer.endQueryResult();
+ }
+}
\ No newline at end of file
diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/util/QueryResultsOutputUtilTest.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/util/QueryResultsOutputUtilTest.java
new file mode 100644
index 0000000..b82e671
--- /dev/null
+++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/util/QueryResultsOutputUtilTest.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.client.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.api.model.VisibilityStatement;
+import org.apache.rya.streams.api.entity.QueryResultStream;
+import org.junit.Test;
+import org.openrdf.model.Statement;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.query.algebra.TupleExpr;
+import org.openrdf.query.impl.MapBindingSet;
+import org.openrdf.query.parser.sparql.SPARQLParser;
+
+import com.google.common.base.Charsets;
+
+/**
+ * Unit tests the methods of {@link QueryResultsOutputUtil}.
+ */
+public class QueryResultsOutputUtilTest {
+
+ private static final ValueFactory VF = new ValueFactoryImpl();
+
+ @Test
+ public void toNtriplesFile() throws Exception {
+ // Mock a result stream that signals shutdown when it returns a set of results.
+ final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+ final QueryResultStream<VisibilityStatement> resultsStream = mock(QueryResultStream.class);
+ when(resultsStream.poll(anyLong())).thenAnswer(invocation -> {
+ shutdownSignal.set(true);
+
+ final List<VisibilityStatement> results = new ArrayList<>();
+
+ Statement stmt = VF.createStatement(VF.createURI("urn:alice"), VF.createURI("urn:age"), VF.createLiteral(23));
+ results.add( new VisibilityStatement(stmt) );
+
+ stmt = VF.createStatement(VF.createURI("urn:bob"), VF.createURI("urn:worksAt"), VF.createLiteral("Taco Shop"));
+ results.add( new VisibilityStatement(stmt) );
+ return results;
+ });
+
+ // The stream the JSON will be written to.
+ final ByteArrayOutputStream out = new ByteArrayOutputStream();
+
+ // Invoke the test method. This will write the NTriples.
+ QueryResultsOutputUtil.toNtriplesFile(out, resultsStream, shutdownSignal);
+
+ // Show the produced NTriples matches the expected NTriples.
+ final String expected =
+ "<urn:alice> <urn:age> \"23\"^^<http://www.w3.org/2001/XMLSchema#int> .\n" +
+ "<urn:bob> <urn:worksAt> \"Taco Shop\" .\n";
+
+ final String nTriples = new String(out.toByteArray(), Charsets.UTF_8);
+ assertEquals(expected, nTriples);
+ }
+
+ @Test
+ public void toBindingSetJSONFile() throws Exception {
+ // A SPARQL query that uses OPTIONAL values.
+ final String sparql =
+ "SELECT * WHERE { " +
+ "?name <urn:worksAt> ?company . " +
+ "OPTIONAL{ ?name <urn:ssn> ?ssn} " +
+ "}";
+ final TupleExpr query = new SPARQLParser().parseQuery(sparql, null).getTupleExpr();
+
+ // Mock a results stream that signals shutdown when it returns a set of results.
+ final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+ final QueryResultStream<VisibilityBindingSet> resultsStream = mock(QueryResultStream.class);
+ when(resultsStream.poll(anyLong())).thenAnswer(invocation -> {
+ shutdownSignal.set(true);
+
+ final List<VisibilityBindingSet> results = new ArrayList<>();
+
+ // A result with the optional value.
+ MapBindingSet bs = new MapBindingSet();
+ bs.addBinding("name", VF.createLiteral("alice"));
+ bs.addBinding("company", VF.createLiteral("Taco Shop"));
+ bs.addBinding("ssn", VF.createURI("urn:111-11-1111"));
+ results.add(new VisibilityBindingSet(bs, ""));
+
+
+ // A result without the optional value.
+ bs = new MapBindingSet();
+ bs.addBinding("name", VF.createLiteral("bob"));
+ bs.addBinding("company", VF.createLiteral("Cafe"));
+ results.add(new VisibilityBindingSet(bs, ""));
+
+ return results;
+ });
+
+ // The stream the JSON will be written to.
+ final ByteArrayOutputStream out = new ByteArrayOutputStream();
+
+ // Invoke the test method. This will write the json.
+ QueryResultsOutputUtil.toBindingSetJSONFile(out, query, resultsStream, shutdownSignal);
+
+ // Show the produced JSON matches the expected JSON.
+ final String expected = "{\"head\":{\"vars\":[\"name\",\"company\",\"ssn\"]},\"results\":{" +
+ "\"bindings\":[{\"name\":{\"type\":\"literal\",\"value\":\"alice\"},\"company\":{" +
+ "\"type\":\"literal\",\"value\":\"Taco Shop\"},\"ssn\":{\"type\":\"uri\",\"value\":" +
+ "\"urn:111-11-1111\"}},{\"name\":{\"type\":\"literal\",\"value\":\"bob\"},\"company\"" +
+ ":{\"type\":\"literal\",\"value\":\"Cafe\"}}]}}";
+ final String json = new String(out.toByteArray(), Charsets.UTF_8);
+ System.out.println(json);
+ assertEquals(expected, json);
+ }
+}
\ No newline at end of file