GH-2400: RDFParser.toDatasetGraph() transactions

Calling RDFParser.toDatasetGraph() creates a fresh transactional
DatasetGraph and then proceeds to using it without transactions meaning
every triple/quad output is a separate auto-committed write transaction.
This behaviour is changed to place the entire parsing process inside a
write transaction to boost performance.
diff --git a/jena-arq/src/main/java/org/apache/jena/riot/RDFParser.java b/jena-arq/src/main/java/org/apache/jena/riot/RDFParser.java
index 278dd8e..f7bee90 100644
--- a/jena-arq/src/main/java/org/apache/jena/riot/RDFParser.java
+++ b/jena-arq/src/main/java/org/apache/jena/riot/RDFParser.java
@@ -55,6 +55,7 @@
 import org.apache.jena.sparql.core.DatasetGraphFactory;
 import org.apache.jena.sparql.graph.GraphFactory;
 import org.apache.jena.sparql.util.Context;
+import org.apache.jena.system.Txn;
 
 /**
  * An {@link RDFParser} is a process that will generate triples and quads;
@@ -325,19 +326,27 @@
 
     /**
      * Parse the source in to a fresh {@link Dataset} and return the dataset.
+     * <p>
+     * It may be preferable to instead call {@link #parse(Dataset)} supplying your desired {@link Dataset}
+     * implementation instead depending on how you intend to further process the parsed data.
+     * </p>
      */
     public Dataset toDataset() {
         Dataset dataset = DatasetFactory.createTxnMem();
-        parse(dataset);
+        dataset.executeWrite(() -> parse(dataset));
         return dataset;
     }
 
     /**
      * Parse the source in to a fresh {@link DatasetGraph} and return the DatasetGraph.
+     * <p>
+     * It may be preferable to instead call {@link #parse(DatasetGraph)} supplying your desired {@link DatasetGraph}
+     * implementation instead depending on how you intend to further process the parsed data.
+     * </p>
      */
     public DatasetGraph toDatasetGraph() {
         DatasetGraph dataset = DatasetGraphFactory.createTxnMem();
-        parse(StreamRDFLib.dataset(dataset));
+        dataset.executeWrite(() -> parse(StreamRDFLib.dataset(dataset)));
         return dataset;
     }
 
diff --git a/jena-arq/src/test/java/org/apache/jena/riot/TestRDFParser.java b/jena-arq/src/test/java/org/apache/jena/riot/TestRDFParser.java
index 129c55e..ec8adb6 100644
--- a/jena-arq/src/test/java/org/apache/jena/riot/TestRDFParser.java
+++ b/jena-arq/src/test/java/org/apache/jena/riot/TestRDFParser.java
@@ -18,11 +18,6 @@
 
 package org.apache.jena.riot;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertTrue;
-
 import java.io.ByteArrayInputStream;
 import java.io.InputStream;
 import java.io.StringReader;
@@ -41,45 +36,54 @@
 import org.apache.jena.riot.system.PrefixMapFactory;
 import org.apache.jena.riot.system.stream.LocatorFile;
 import org.apache.jena.riot.system.stream.StreamManager;
+import org.apache.jena.sparql.core.DatasetGraph;
+import org.apache.jena.sparql.core.DatasetGraphFactory;
 import org.apache.jena.sparql.graph.GraphFactory;
 import org.apache.jena.sparql.sse.SSE;
+import org.apache.jena.system.Txn;
+import org.junit.Assert;
 import org.junit.Test;
 
+import static org.junit.Assert.*;
+
 public class TestRDFParser {
 
     // Location of test files.
     private static String DIR = "testing/RIOT/Parser/";
     private static String testdata = "@prefix : <http://example/ns#> . :x :x _:b .";
 
-    @Test public void source_not_uri_01() {
+    @Test
+    public void source_not_uri_01() {
         Graph graph = GraphFactory.createGraphMem();
         RDFParserBuilder.create().lang(Lang.TTL).fromString(testdata).parse(graph);
         assertEquals(1, graph.size());
     }
 
-    @Test public void source_not_uri_02() {
+    @Test
+    public void source_not_uri_02() {
         Graph graph = GraphFactory.createGraphMem();
         InputStream input = new ByteArrayInputStream(testdata.getBytes(StandardCharsets.UTF_8));
         RDFParser.create().lang(Lang.TTL).source(input).parse(graph);
         assertEquals(1, graph.size());
     }
 
-    @Test public void source_uri_01() {
+    @Test
+    public void source_uri_01() {
         Graph graph = GraphFactory.createGraphMem();
-        RDFParser.create().source("file:"+DIR+"data.ttl").parse(graph);
+        RDFParser.create().source("file:" + DIR + "data.ttl").parse(graph);
         assertEquals(3, graph.size());
     }
 
-    @Test(expected=RiotException.class)
+    @Test(expected = RiotException.class)
     public void source_uri_02() {
         Graph graph = GraphFactory.createGraphMem();
-        RDFParser.create().source("file:"+DIR+"data.unknown").parse(graph);
+        RDFParser.create().source("file:" + DIR + "data.unknown").parse(graph);
     }
 
     @Test
     public void source_uri_03() {
         Graph graph = GraphFactory.createGraphMem();
-        RDFParser.create().source("file:"+DIR+"data.unknown").lang(Lang.TTL).parse(graph);
+        RDFParser.create().source("file:" + DIR + "data.unknown").lang(Lang.TTL).parse(graph);
         assertEquals(3, graph.size());
     }
 
@@ -87,8 +91,8 @@
     public void source_uri_04() {
         Graph graph = GraphFactory.createGraphMem();
         RDFParser.create()
-            .source(Path.of(DIR+"data.ttl"))
-            .parse(graph);
+                 .source(Path.of(DIR + "data.ttl"))
+                 .parse(graph);
         assertEquals(3, graph.size());
     }
 
@@ -97,46 +101,47 @@
         // Last source wins.
         Graph graph = GraphFactory.createGraphMem();
         RDFParser.create()
-            .source("http://example/")
-            .source(DIR+"data.ttl")
-            .parse(graph);
+                 .source("http://example/")
+                 .source(DIR + "data.ttl")
+                 .parse(graph);
         assertEquals(3, graph.size());
     }
 
     // Shortcut source
-    @Test public void source_shortcut_01() {
+    @Test
+    public void source_shortcut_01() {
         Graph graph = GraphFactory.createGraphMem();
         RDFParser.fromString(testdata, Lang.TTL).parse(graph);
         assertEquals(1, graph.size());
     }
 
-    @Test(expected=RiotNotFoundException.class)
+    @Test(expected = RiotNotFoundException.class)
     public void source_notfound_1() {
         // Last source wins.
         Graph graph = GraphFactory.createGraphMem();
         RDFParser.create()
-            .source(Path.of(DIR+"data.nosuchfile.ttl"))
-            .parse(graph);
+                 .source(Path.of(DIR + "data.nosuchfile.ttl"))
+                 .parse(graph);
         assertEquals(3, graph.size());
     }
 
-    @Test(expected=RiotNotFoundException.class)
+    @Test(expected = RiotNotFoundException.class)
     public void source_notfound_2() {
         // Last source wins.
         Graph graph = GraphFactory.createGraphMem();
         RDFParser.create()
-            .source(DIR+"data.nosuchfile.ttl")
-            .parse(graph);
+                 .source(DIR + "data.nosuchfile.ttl")
+                 .parse(graph);
         assertEquals(3, graph.size());
     }
 
-    @Test(expected=RiotException.class)
+    @Test(expected = RiotException.class)
     public void source_uri_hint_lang() {
         Graph graph = GraphFactory.createGraphMem();
         RDFParser.create().source("file:data.rdf")
-            .lang(Lang.RDFXML)
-            .errorHandler(ErrorHandlerFactory.errorHandlerNoLogging)
-            .parse(graph);
+                 .lang(Lang.RDFXML)
+                 .errorHandler(ErrorHandlerFactory.errorHandlerNoLogging)
+                 .parse(graph);
         assertEquals(3, graph.size());
     }
 
@@ -144,32 +149,32 @@
     public void source_string() {
         Graph graph = GraphFactory.createGraphMem();
         RDFParser.create().fromString("<x> <p> <z> .")
-            .lang(Lang.NT)
-            .parse(graph);
+                 .lang(Lang.NT)
+                 .parse(graph);
         assertEquals(1, graph.size());
     }
 
-    @Test(expected=RiotException.class)
+    @Test(expected = RiotException.class)
     public void errorHandler() {
         Graph graph = GraphFactory.createGraphMem();
         // This test file contains Turtle.
-        RDFParser.create().source(DIR+"data.rdf")
-            // and no test log output.
-            .errorHandler(ErrorHandlerFactory.errorHandlerNoLogging)
-            .parse(graph);
+        RDFParser.create().source(DIR + "data.rdf")
+                 // and no test log output.
+                 .errorHandler(ErrorHandlerFactory.errorHandlerNoLogging)
+                 .parse(graph);
     }
 
     @Test
     public void source_uri_force_lang() {
         Graph graph = GraphFactory.createGraphMem();
-        RDFParser.create().source("file:"+DIR+"data.rdf").forceLang(Lang.TTL).parse(graph);
+        RDFParser.create().source("file:" + DIR + "data.rdf").forceLang(Lang.TTL).parse(graph);
         assertEquals(3, graph.size());
     }
 
     @Test
     public void source_streamManager() {
         StreamManager sMgr = new StreamManager();
-        sMgr.addLocator(new LocatorFile(DIR)) ;
+        sMgr.addLocator(new LocatorFile(DIR));
         Graph graph = GraphFactory.createGraphMem();
         RDFParser.create().streamManager(sMgr).source("file:data.rdf").forceLang(Lang.TTL).parse(graph);
         assertEquals(3, graph.size());
@@ -177,6 +182,7 @@
 
     private static class TestingFactoryRDF extends FactoryRDFStd {
         int counter = 0;
+
         @Override
         public Node createURI(String uriStr) {
             counter++;
@@ -189,13 +195,14 @@
         return RDFParserBuilder.create().lang(Lang.TTL).source(input);
     }
 
-    @Test public void labels_01() {
+    @Test
+    public void labels_01() {
         Graph graph = GraphFactory.createGraphMem();
         //LabelToNode.createUseLabelEncoded() ;
 
         builder()
-            .labelToNode(LabelToNode.createUseLabelAsGiven())
-            .parse(graph);
+                .labelToNode(LabelToNode.createUseLabelAsGiven())
+                .parse(graph);
         assertEquals(1, graph.size());
         StringWriter sw = new StringWriter();
         RDFDataMgr.write(sw, graph, Lang.NT);
@@ -203,31 +210,36 @@
         assertTrue(s.contains("_:Bb"));
     }
 
-    @Test public void factory_01() {
+    @Test
+    public void factory_01() {
         TestingFactoryRDF f = new TestingFactoryRDF();
         Graph graph = GraphFactory.createGraphMem();
         builder()
-            .factory(f)
-            .parse(graph);
+                .factory(f)
+                .parse(graph);
         assertEquals(1, graph.size());
         assertNotEquals(0, f.counter);
     }
 
     // Canonical literals.
 
-    @Test public void canonical_value_1() {
+    @Test
+    public void canonical_value_1() {
         testNormalization("0123", "0123", builder().canonicalValues(false));
     }
 
-    @Test public void canonical_value_2() {
+    @Test
+    public void canonical_value_2() {
         testNormalization("+123", "123", builder().canonicalValues(true));
     }
 
-    @Test public void canonical_value_3() {
+    @Test
+    public void canonical_value_3() {
         testNormalization("+123.00", "123.0", builder().canonicalValues(true));
     }
 
-    @Test public void canonical_value_4() {
+    @Test
+    public void canonical_value_4() {
         testNormalization("+123.00e0", "1.23E2", builder().canonicalValues(true));
     }
 
@@ -248,9 +260,9 @@
     public void parser_fragment() {
         PrefixMap pmap = PrefixMapFactory.create(Map.of("", "http://example/"));
         Graph g = RDFParser.fromString("<s> :p :o .", Lang.TTL)
-                .prefixes(pmap)
-                .base("http://base/")
-                .toGraph();
+                           .prefixes(pmap)
+                           .base("http://base/")
+                           .toGraph();
         assertFalse(g.isEmpty());
         Graph g2 = GraphFactory.createDefaultGraph();
         g2.add(NodeFactory.createURI("http://base/s"),
@@ -265,11 +277,72 @@
 
     private void testNormalization(String input, String output, RDFParserBuilder builder) {
         Graph graph = GraphFactory.createGraphMem();
-        String x = PREFIX+":s :p "+input;
+        String x = PREFIX + ":s :p " + input;
         builder.source(new StringReader(x)).parse(graph);
         assertEquals(1, graph.size());
         Node objExpected = SSE.parseNode(output);
         Node objObtained = graph.find(s, p, null).next().getObject();
         assertEquals(objExpected, objObtained);
     }
+
+    @Test
+    public void parse_to_dataset_implicit() {
+        RDFParserBuilder builder = RDFParserBuilder.create().lang(Lang.TRIG).fromString(testdata);
+        DatasetGraph dsg = builder.toDatasetGraph();
+        assertEquals(dsg.stream().count(), 1);
+    }
+
+    @Test
+    public void parse_to_dataset_supplied() {
+        RDFParserBuilder builder = RDFParserBuilder.create().lang(Lang.TRIG).fromString(testdata);
+        DatasetGraph dsg = DatasetGraphFactory.create();
+        builder.parse(dsg);
+        assertEquals(dsg.stream().count(), 1);
+    }
+
+    @Test
+    public void parse_to_dataset_supplied_in_transaction() {
+        RDFParserBuilder builder = RDFParserBuilder.create().lang(Lang.TRIG).fromString(testdata);
+        DatasetGraph dsg = DatasetGraphFactory.createTxnMem();
+        dsg.executeWrite(() -> builder.parse(dsg));
+        assertEquals(dsg.stream().count(), 1);
+    }
+
+    @Test
+    public void parse_to_dataset_malformed_01() {
+        RDFParserBuilder builder = RDFParserBuilder.create().lang(Lang.TRIG).fromString(testdata + "\njunk data goes here");
+        DatasetGraph dsg = DatasetGraphFactory.createTxnMem();
+        try {
+            Txn.executeWrite(dsg, () -> builder.parse(dsg));
+            Assert.fail("Parsing should have produced an error");
+        } catch (RiotException e) {
+            // Size should be zero as failure to parse should abort the transaction and produce an empty dataset
+            assertEquals(dsg.stream().count(), 0);
+        }
+    }
+
+    @Test
+    public void parse_to_dataset_malformed_01a() {
+        RDFParserBuilder builder = RDFParserBuilder.create().lang(Lang.TRIG).fromString(testdata + "\njunk data goes here");
+        DatasetGraph dsg = DatasetGraphFactory.createTxnMem();
+        try {
+            builder.parse(dsg);
+            Assert.fail("Parsing should have produced an error");
+        } catch (RiotException e) {
+            // Without a transaction the valid quad would have gone into the dataset prior to the error occurring
+            assertEquals(dsg.stream().count(), 1);
+        }
+    }
+
+    @Test
+    public void parse_to_dataset_malformed_02() {
+        RDFParserBuilder builder = RDFParserBuilder.create().lang(Lang.TRIG).fromString(testdata + "\njunk data goes here");
+        DatasetGraph dsg = null;
+        try {
+            dsg = builder.toDatasetGraph();
+            Assert.fail("Parsing should have produced an error");
+        } catch (RiotException e) {
+            assertNull(dsg);
+        }
+    }
 }