RYA-466 Update the Rya Streams Client to stream results to file.
diff --git a/common/rya.api/src/main/java/org/apache/rya/api/utils/QueryInvestigator.java b/common/rya.api/src/main/java/org/apache/rya/api/utils/QueryInvestigator.java
new file mode 100644
index 0000000..2fbd09b
--- /dev/null
+++ b/common/rya.api/src/main/java/org/apache/rya/api/utils/QueryInvestigator.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.utils;
+
+import static java.util.Objects.requireNonNull;
+
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.parser.ParsedGraphQuery;
+import org.openrdf.query.parser.ParsedQuery;
+import org.openrdf.query.parser.QueryParserUtil;
+import org.openrdf.query.parser.sparql.SPARQLParser;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A utility class that is used to glean insight into the structure of SPARQL queries.
+ */
+@DefaultAnnotation(NonNull.class)
+public class QueryInvestigator {
+
+    private static final SPARQLParser PARSER = new SPARQLParser();
+
+    private QueryInvestigator() { }
+
+    /**
+     * Determines whether a SPARQL command is a CONSTRUCT or not.
+     *
+     * @param sparql - The SPARQL to evaluate. (not null)
+     * @return {@code true} if the provided SPARQL is a CONSTRUCT query; otherwise {@code false}.
+     * @throws MalformedQueryException The SPARQL is neither a well formed query or update.
+     */
+    public static boolean isConstruct(final String sparql) throws MalformedQueryException {
+        requireNonNull(sparql);
+
+        try {
+            // Constructs are queries, so try to create a ParsedQuery.
+            final ParsedQuery parsedQuery = PARSER.parseQuery(sparql, null);
+
+            // Check to see if the SPARQL looks like a CONSTRUCT query.
+            return parsedQuery instanceof ParsedGraphQuery;
+
+        } catch(final MalformedQueryException queryE) {
+            try {
+                // Maybe it's an update.
+                PARSER.parseUpdate(sparql, null);
+
+                // It was, so return false.
+                return false;
+
+            } catch(final MalformedQueryException updateE) {
+                // It's not. Actually malformed.
+                throw queryE;
+            }
+        }
+    }
+
+    /**
+     * Determines whether a SPARQL command is an INSERT with a WHERE clause or not.
+     *
+     * @param sparql - The SPARQL to evaluate. (not null)
+     * @return {@code true} if the provided SPARQL is an INSERT update; otherwise {@code false}.
+     * @throws MalformedQueryException The SPARQL is neither a well formed query or update.
+     */
+    public static boolean isInsertWhere(final String sparql) throws MalformedQueryException {
+        requireNonNull(sparql);
+
+        try {
+            // Inserts are updated, so try to create a ParsedUpdate.
+            PARSER.parseUpdate(sparql, null);
+            final String strippedOperation = QueryParserUtil.removeSPARQLQueryProlog(sparql.toLowerCase());
+            return strippedOperation.startsWith("insert");
+        } catch(final MalformedQueryException updateE) {
+            try {
+                // Maybe it's a query.
+                PARSER.parseQuery(sparql, null);
+
+                // It was, so return false.
+                return false;
+
+            } catch(final MalformedQueryException queryE) {
+                // It's not. Actually malformed.
+                throw updateE;
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/common/rya.api/src/test/java/org/apache/rya/api/utils/QueryInvestigatorTest.java b/common/rya.api/src/test/java/org/apache/rya/api/utils/QueryInvestigatorTest.java
new file mode 100644
index 0000000..fbe2ebe
--- /dev/null
+++ b/common/rya.api/src/test/java/org/apache/rya/api/utils/QueryInvestigatorTest.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.utils;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.Test;
+import org.openrdf.query.MalformedQueryException;
+
+/**
+ * Unit tests the methods of {@link QueryInvestigator}.
+ */
+public class QueryInvestigatorTest {
+
+    @Test
+    public void isConstruct_true() throws Exception {
+        final String sparql =
+                "PREFIX vCard: <http://www.w3.org/2001/vcard-rdf/3.0#> " +
+                "PREFIX foaf: <http://xmlns.com/foaf/0.1/> " +
+                "CONSTRUCT { " +
+                    "?X vCard:FN ?name . " +
+                    "?X vCard:URL ?url . " +
+                    "?X vCard:TITLE ?title . " +
+                "} " +
+                "FROM <http://www.w3.org/People/Berners-Lee/card> " +
+                "WHERE { " +
+                    "OPTIONAL { ?X foaf:name ?name . FILTER isLiteral(?name) . } " +
+                    "OPTIONAL { ?X foaf:homepage ?url . FILTER isURI(?url) . } " +
+                    "OPTIONAL { ?X foaf:title ?title . FILTER isLiteral(?title) . } " +
+                "}";
+
+        assertTrue( QueryInvestigator.isConstruct(sparql) );
+    }
+
+    @Test
+    public void isConstruct_false_notAConstruct() throws Exception {
+        final String sparql = "SELECT * WHERE { ?a ?b ?c . }";
+        assertFalse( QueryInvestigator.isConstruct(sparql) );
+    }
+
+    @Test
+    public void isConstruct_false_notAConstructWithKeywords() throws Exception {
+        final String sparql =
+                "SELECT ?construct " +
+                "WHERE {" +
+                "   ?construct <urn:built> <urn:skyscraper> ." +
+                "}";
+        assertFalse( QueryInvestigator.isConstruct(sparql) );
+    }
+
+    @Test
+    public void isConstruct_false_notAQuery() throws Exception {
+        final String sparql =
+                "PREFIX Sensor: <http://example.com/Equipment.owl#> " +
+                "INSERT { " +
+                    "?subject Sensor:test2 ?newValue " +
+                "} WHERE {" +
+                    "values (?oldValue ?newValue) {" +
+                        "('testValue1' 'newValue1')" +
+                        "('testValue2' 'newValue2')" +
+                    "}" +
+                    "?subject Sensor:test1 ?oldValue" +
+                "}";
+
+        assertFalse( QueryInvestigator.isConstruct(sparql) );
+    }
+
+    @Test(expected = MalformedQueryException.class)
+    public void isConstruct_false_malformed() throws MalformedQueryException {
+        assertFalse( QueryInvestigator.isConstruct("not sparql") );
+    }
+
+    @Test
+    public void isInsert_true() throws Exception {
+        final String sparql =
+                "PREFIX Sensor: <http://example.com/Equipment.owl#> " +
+                "INSERT { " +
+                    "?subject Sensor:test2 ?newValue " +
+                "} WHERE {" +
+                    "values (?oldValue ?newValue) {" +
+                        "('testValue1' 'newValue1')" +
+                        "('testValue2' 'newValue2')" +
+                    "}" +
+                    "?subject Sensor:test1 ?oldValue" +
+                "}";
+
+        assertTrue( QueryInvestigator.isInsertWhere(sparql) );
+    }
+
+    @Test
+    public void isInsert_false_notAnInsert() throws Exception {
+        final String sparql =
+                "PREFIX dc:  <http://purl.org/dc/elements/1.1/> " +
+                "PREFIX xsd: <http://www.w3.org/2001/XMLSchema#> " +
+                "DELETE { " +
+                    "?book ?p ?v " +
+                "} WHERE { " +
+                    "?book dc:date ?date . " +
+                    "FILTER ( ?date < \"2000-01-01T00:00:00\"^^xsd:dateTime ) " +
+                    "?book ?p ?v " +
+                "}";
+
+        assertFalse( QueryInvestigator.isInsertWhere(sparql) );
+    }
+
+    @Test
+    public void isInsert_false_notAnInsertWithKeywords() throws Exception {
+        final String sparql =
+                "DELETE" +
+                "{ " +
+                "    ?bookInsert ?p ?o" +
+                "}" +
+                "WHERE" +
+                "{ " +
+                "    ?bookInsert <urn:datePrinted> ?datePrinted  ." +
+                "    FILTER ( ?datePrinted < \"2018-01-01T00:00:00\"^^xsd:dateTime )" +
+                "    ?bookInsert ?p ?o" +
+                "}";
+
+        assertFalse( QueryInvestigator.isInsertWhere(sparql) );
+    }
+
+    @Test
+    public void isInsert_false_notAnUpdate() throws Exception {
+        final String sparql = "SELECT * WHERE { ?a ?b ?c . }";
+        assertFalse( QueryInvestigator.isInsertWhere(sparql) );
+    }
+
+    @Test(expected = MalformedQueryException.class)
+    public void isInsert_false_malformed() throws MalformedQueryException {
+        assertFalse( QueryInvestigator.isInsertWhere("not sparql") );
+    }
+}
\ No newline at end of file
diff --git a/extras/rya.streams/client/pom.xml b/extras/rya.streams/client/pom.xml
index 5bd7933..38c3c86 100644
--- a/extras/rya.streams/client/pom.xml
+++ b/extras/rya.streams/client/pom.xml
@@ -69,6 +69,10 @@
             <groupId>org.openrdf.sesame</groupId>
             <artifactId>sesame-rio-turtle</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.openrdf.sesame</groupId>
+            <artifactId>sesame-queryresultio-sparqljson</artifactId>
+        </dependency>
             
         <!-- Third Party dependencies -->
         <dependency>
@@ -92,6 +96,11 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>org.apache.rya</groupId>
             <artifactId>rya.test.kafka</artifactId>
             <scope>test</scope>
diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java
index 3612dd0..8ae4e08 100644
--- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java
+++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,11 +20,18 @@
 
 import static java.util.Objects.requireNonNull;
 
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.api.model.VisibilityStatement;
+import org.apache.rya.api.utils.QueryInvestigator;
 import org.apache.rya.streams.api.entity.QueryResultStream;
 import org.apache.rya.streams.api.entity.StreamsQuery;
 import org.apache.rya.streams.api.interactor.GetQueryResultStream;
@@ -32,13 +39,13 @@
 import org.apache.rya.streams.api.queries.QueryChangeLog;
 import org.apache.rya.streams.api.queries.QueryRepository;
 import org.apache.rya.streams.client.RyaStreamsCommand;
+import org.apache.rya.streams.client.util.QueryResultsOutputUtil;
 import org.apache.rya.streams.kafka.KafkaTopics;
 import org.apache.rya.streams.kafka.interactor.KafkaGetQueryResultStream;
 import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLogFactory;
 import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
 import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer;
 import org.openrdf.query.MalformedQueryException;
-import org.openrdf.query.algebra.Reduced;
 import org.openrdf.query.algebra.TupleExpr;
 import org.openrdf.query.parser.sparql.SPARQLParser;
 
@@ -65,14 +72,20 @@
         @Parameter(names = {"--queryId", "-q"}, required = true, description = "The query whose results will be streamed to the console.")
         private String queryId;
 
+        @Parameter(names = {"--file", "-f"}, required = false, description = "If provided, the output file the results will stream into.")
+        private String outputPath;
+
         @Override
         public String toString() {
             final StringBuilder parameters = new StringBuilder();
             parameters.append(super.toString());
 
             if (!Strings.isNullOrEmpty(queryId)) {
-                parameters.append("\tQuery ID: " + queryId);
-                parameters.append("\n");
+                parameters.append("\tQuery ID: " + queryId + "\n");
+            }
+
+            if(!Strings.isNullOrEmpty(outputPath)) {
+                parameters.append("\tOutput Path: " + outputPath + "\n");
             }
 
             return parameters.toString();
@@ -160,10 +173,12 @@
         });
 
         // Build the interactor based on the type of result the query produces.
+        boolean isStatementResults = false;
+
         final GetQueryResultStream<?> getQueryResultStream;
         try {
-            final TupleExpr tupleExpr = new SPARQLParser().parseQuery(sparql, null).getTupleExpr();
-            if(tupleExpr instanceof Reduced) {
+            isStatementResults = QueryInvestigator.isConstruct(sparql) | QueryInvestigator.isInsertWhere(sparql);
+            if(isStatementResults) {
                 getQueryResultStream = new KafkaGetQueryResultStream<>(params.kafkaIP, params.kafkaPort, VisibilityStatementDeserializer.class);
             } else {
                 getQueryResultStream = new KafkaGetQueryResultStream<>(params.kafkaIP, params.kafkaPort, VisibilityBindingSetDeserializer.class);
@@ -172,12 +187,22 @@
             throw new ExecutionException("Could not parse the SPARQL for the query: " + sparql, e);
         }
 
-        // Iterate through the results and print them to the console until the program or the stream ends.
-        try (final QueryResultStream<?> stream = getQueryResultStream.fromStart(queryId)) {
-            while(!finished.get()) {
-                for(final Object result : stream.poll(1000)) {
-                    System.out.println(result);
+        // Iterate through the results and print them to the configured output mechanism.
+        try (final QueryResultStream<?> resultsStream = getQueryResultStream.fromStart(queryId)) {
+            final TupleExpr tupleExpr = new SPARQLParser().parseQuery(sparql, null).getTupleExpr();
+            if(params.outputPath != null) {
+                final Path file = Paths.get(params.outputPath);
+                try (final OutputStream out = Files.newOutputStream(file)) {
+                    if(isStatementResults) {
+                        final QueryResultStream<VisibilityStatement> stmtStream = (QueryResultStream<VisibilityStatement>) resultsStream;
+                        QueryResultsOutputUtil.toNtriplesFile(out, stmtStream, finished);
+                    } else {
+                        final QueryResultStream<VisibilityBindingSet> bsStream = (QueryResultStream<VisibilityBindingSet>) resultsStream;
+                        QueryResultsOutputUtil.toBindingSetJSONFile(out, tupleExpr, bsStream, finished);
+                    }
                 }
+            } else {
+                streamToSystemOut(resultsStream, finished);
             }
         } catch (final Exception e) {
             System.err.println("Error while reading the results from the stream.");
@@ -185,4 +210,15 @@
             System.exit(1);
         }
     }
+
+    private static void streamToSystemOut(final QueryResultStream<?> stream, final AtomicBoolean shutdownSignal) throws Exception {
+        requireNonNull(stream);
+        requireNonNull(shutdownSignal);
+
+        while(!shutdownSignal.get()) {
+            for(final Object result : stream.poll(1000)) {
+                System.out.println(result);
+            }
+        }
+    }
 }
\ No newline at end of file
diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/util/QueryResultsOutputUtil.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/util/QueryResultsOutputUtil.java
new file mode 100644
index 0000000..114a6fe
--- /dev/null
+++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/util/QueryResultsOutputUtil.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.client.util;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.OutputStream;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.api.model.VisibilityStatement;
+import org.apache.rya.streams.api.entity.QueryResultStream;
+import org.apache.rya.streams.api.exception.RyaStreamsException;
+import org.openrdf.query.TupleQueryResultHandlerException;
+import org.openrdf.query.algebra.TupleExpr;
+import org.openrdf.query.resultio.sparqljson.SPARQLResultsJSONWriter;
+import org.openrdf.rio.RDFFormat;
+import org.openrdf.rio.RDFHandlerException;
+import org.openrdf.rio.RDFWriter;
+import org.openrdf.rio.Rio;
+import org.openrdf.rio.WriterConfig;
+import org.openrdf.rio.helpers.BasicWriterSettings;
+
+import com.google.common.collect.Lists;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A utility that writes {@link QueryResultStream} results to an {@link OutputStream}.
+ */
+@DefaultAnnotation(NonNull.class)
+public class QueryResultsOutputUtil {
+
+    /**
+     * Private constructor to prevent instantiation.
+     */
+    private QueryResultsOutputUtil() { }
+
+    /**
+     * Writes the results of a {@link QueryResultStream} to the output stream as NTriples until the
+     * shutdown signal is set.
+     *
+     * @param out - The stream the NTriples data will be written to. (not null)
+     * @param resultsStream - The results stream that will be polled for results to
+     *   write to {@code out}. (not null)
+     * @param shutdownSignal - Setting this signal will cause the thread that
+     *   is processing this function to finish and leave. (not null)
+     * @throws RDFHandlerException A problem was encountered while
+     *   writing the NTriples to the output stream.
+     * @throws IllegalStateException The {@code resultsStream} is closed.
+     * @throws RyaStreamsException Could not fetch the next set of results.
+     */
+    public static void toNtriplesFile(
+            final OutputStream out,
+            final QueryResultStream<VisibilityStatement> resultsStream,
+            final AtomicBoolean shutdownSignal) throws RDFHandlerException, IllegalStateException, RyaStreamsException {
+        requireNonNull(out);
+        requireNonNull(resultsStream);
+        requireNonNull(shutdownSignal);
+
+        final RDFWriter writer = Rio.createWriter(RDFFormat.NTRIPLES, out);
+        writer.startRDF();
+
+        while(!shutdownSignal.get()) {
+            final Iterable<VisibilityStatement> it = resultsStream.poll(1000);
+            for(final VisibilityStatement result : it) {
+                writer.handleStatement(result);
+            }
+        }
+
+        writer.endRDF();
+    }
+
+    /**
+     * Writes the results of a {@link QueryResultStream} to the output stream as JSON until the
+     * shutdown signal is set.
+     *
+     * @param out - The stream the JSON will be written to. (not null)
+     * @param query - The parsed SPARQL Query whose results are being output. This
+     *   object is used to figure out which bindings may appear. (not null)
+     * @param resultsStream - The results stream that will be polled for results to
+     *   write to {@code out}. (not null)
+     * @param shutdownSignal - Setting this signal will cause the thread that
+     *   is processing this function to finish and leave. (not null)
+     * @throws TupleQueryResultHandlerException A problem was encountered while
+     *   writing the JSON to the output stream.
+     * @throws IllegalStateException The {@code resultsStream} is closed.
+     * @throws RyaStreamsException Could not fetch the next set of results.
+     */
+    public static void toBindingSetJSONFile(
+            final OutputStream out,
+            final TupleExpr query,
+            final QueryResultStream<VisibilityBindingSet> resultsStream,
+            final AtomicBoolean shutdownSignal) throws TupleQueryResultHandlerException, IllegalStateException, RyaStreamsException {
+        requireNonNull(out);
+        requireNonNull(query);
+        requireNonNull(resultsStream);
+        requireNonNull(shutdownSignal);
+
+        // Create a writer that does not pretty print.
+        final SPARQLResultsJSONWriter writer = new SPARQLResultsJSONWriter(out);
+        final WriterConfig config = writer.getWriterConfig();
+        config.set(BasicWriterSettings.PRETTY_PRINT, false);
+
+        // Start the JSON and enumerate the possible binding names.
+        writer.startQueryResult( Lists.newArrayList(query.getBindingNames()) );
+
+        while(!shutdownSignal.get()) {
+            for(final VisibilityBindingSet result : resultsStream.poll(1000)) {
+                writer.handleSolution(result);
+            }
+        }
+
+        writer.endQueryResult();
+    }
+}
\ No newline at end of file
diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/util/QueryResultsOutputUtilTest.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/util/QueryResultsOutputUtilTest.java
new file mode 100644
index 0000000..b82e671
--- /dev/null
+++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/util/QueryResultsOutputUtilTest.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.client.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.api.model.VisibilityStatement;
+import org.apache.rya.streams.api.entity.QueryResultStream;
+import org.junit.Test;
+import org.openrdf.model.Statement;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.query.algebra.TupleExpr;
+import org.openrdf.query.impl.MapBindingSet;
+import org.openrdf.query.parser.sparql.SPARQLParser;
+
+import com.google.common.base.Charsets;
+
+/**
+ * Unit tests the methods of {@link QueryResultsOutputUtil}.
+ */
+public class QueryResultsOutputUtilTest {
+
+    private static final ValueFactory VF = new ValueFactoryImpl();
+
+    @Test
+    public void toNtriplesFile() throws Exception {
+        // Mock a result stream that signals shutdown when it returns a set of results.
+        final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+        final QueryResultStream<VisibilityStatement> resultsStream = mock(QueryResultStream.class);
+        when(resultsStream.poll(anyLong())).thenAnswer(invocation -> {
+            shutdownSignal.set(true);
+
+            final List<VisibilityStatement> results = new ArrayList<>();
+
+            Statement stmt = VF.createStatement(VF.createURI("urn:alice"), VF.createURI("urn:age"), VF.createLiteral(23));
+            results.add( new VisibilityStatement(stmt) );
+
+            stmt = VF.createStatement(VF.createURI("urn:bob"), VF.createURI("urn:worksAt"), VF.createLiteral("Taco Shop"));
+            results.add( new VisibilityStatement(stmt) );
+            return results;
+        });
+
+        // The stream the JSON will be written to.
+        final ByteArrayOutputStream out = new ByteArrayOutputStream();
+
+        // Invoke the test method. This will write the NTriples.
+        QueryResultsOutputUtil.toNtriplesFile(out, resultsStream, shutdownSignal);
+
+        // Show the produced NTriples matches the expected NTriples.
+        final String expected =
+                "<urn:alice> <urn:age> \"23\"^^<http://www.w3.org/2001/XMLSchema#int> .\n" +
+                "<urn:bob> <urn:worksAt> \"Taco Shop\" .\n";
+
+        final String nTriples = new String(out.toByteArray(), Charsets.UTF_8);
+        assertEquals(expected, nTriples);
+    }
+
+    @Test
+    public void toBindingSetJSONFile() throws Exception {
+        // A SPARQL query that uses OPTIONAL values.
+        final String sparql =
+                "SELECT * WHERE { " +
+                    "?name <urn:worksAt> ?company . " +
+                    "OPTIONAL{ ?name <urn:ssn> ?ssn} " +
+                "}";
+        final TupleExpr query = new SPARQLParser().parseQuery(sparql, null).getTupleExpr();
+
+        // Mock a results stream that signals shutdown when it returns a set of results.
+        final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+        final QueryResultStream<VisibilityBindingSet> resultsStream = mock(QueryResultStream.class);
+        when(resultsStream.poll(anyLong())).thenAnswer(invocation -> {
+            shutdownSignal.set(true);
+
+            final List<VisibilityBindingSet> results = new ArrayList<>();
+
+            // A result with the optional value.
+            MapBindingSet bs = new MapBindingSet();
+            bs.addBinding("name", VF.createLiteral("alice"));
+            bs.addBinding("company", VF.createLiteral("Taco Shop"));
+            bs.addBinding("ssn", VF.createURI("urn:111-11-1111"));
+            results.add(new VisibilityBindingSet(bs, ""));
+
+
+            // A result without the optional value.
+            bs = new MapBindingSet();
+            bs.addBinding("name", VF.createLiteral("bob"));
+            bs.addBinding("company", VF.createLiteral("Cafe"));
+            results.add(new VisibilityBindingSet(bs, ""));
+
+            return results;
+        });
+
+        // The stream the JSON will be written to.
+        final ByteArrayOutputStream out = new ByteArrayOutputStream();
+
+        // Invoke the test method. This will write the json.
+        QueryResultsOutputUtil.toBindingSetJSONFile(out, query, resultsStream, shutdownSignal);
+
+        // Show the produced JSON matches the expected JSON.
+        final String expected =  "{\"head\":{\"vars\":[\"name\",\"company\",\"ssn\"]},\"results\":{" +
+                "\"bindings\":[{\"name\":{\"type\":\"literal\",\"value\":\"alice\"},\"company\":{" +
+                "\"type\":\"literal\",\"value\":\"Taco Shop\"},\"ssn\":{\"type\":\"uri\",\"value\":" +
+                "\"urn:111-11-1111\"}},{\"name\":{\"type\":\"literal\",\"value\":\"bob\"},\"company\"" +
+                ":{\"type\":\"literal\",\"value\":\"Cafe\"}}]}}";
+        final String json = new String(out.toByteArray(), Charsets.UTF_8);
+        System.out.println(json);
+        assertEquals(expected, json);
+    }
+}
\ No newline at end of file