Merge pull request #1127 from afs/xloader

Xloader (more)
diff --git a/apache-jena/bin/tdb2.xloader b/apache-jena/bin/tdb2.xloader
index 9edd48f..bfd304c 100755
--- a/apache-jena/bin/tdb2.xloader
+++ b/apache-jena/bin/tdb2.xloader
@@ -16,13 +16,71 @@
 ## See the License for the specific language governing permissions and
 ## limitations under the License.
 
+## External environment variables.
+##   JENA_CP
+##   JENA_HOME
+##   JVM_ARGS
+
+## Programs used:
+## jq
+## sort
+## /usr/bin/gzip
+
+## ======= Check environment
+
+function testForProgram() {
+    local CMD="$1"
+    type -p "$CMD" &> /dev/null
+    if [[ $? -ne 0 ]]; then
+	echo "Command $CMD not found"
+	return 1
+    fi
+}
+
+## Test for "sort --parallel" (though without that, it's going to be slower!
+function testSortParallel() {
+    set +e
+    sort --parallel=3 < /dev/null &>/dev/null
+    if [[ $? -ne 0 ]]; then
+        echo "No --parallel support in sort(1)" 2&>1
+	exit 9
+    fi
+    set -e
+}
+
+JAVA="${JAVA:-java}"
+
+COMPLETE="yes"
+for F in jq sort /usr/bin/gzip java
+do
+    testForProgram "$F"
+    if [[ $? -ne 0 ]] ; then
+	COMPLETE="no"
+    fi
+done
+
+if [[ $COMPLETE -ne "yes" ]] ; then
+    echo "One or more programs missing" 2&>1
+    exit 9
+fi
+
+unset COMPLETE
+
+testSortParallel
+
+## ======== Setup
+
+## Environment variable TMPDIR is ignored.
+## it is often a small-ish area unsuitable for large temp files.
+## Use the --tmpdir flag
+
+TMPDIR=
+
+## Format used in logging with date(1).
 DATE="+%H:%M:%S"
-# JENA_CP=
-# JENA_HOME=
-# TMPDIR=
 
 ## Functions.
-# log "LEVEL" "MESSAGE"
+# Usage: log "LEVEL" "MESSAGE"
 function log() {
     local LEVEL="$1"
     local MSG="$2"
@@ -92,6 +150,8 @@
     Do not set to all available RAM.
     Increasing it does not make the loader faster.
 
+The temporary directory defaults to the datbase directory.
+
 EOF
 }
 
@@ -160,7 +220,6 @@
   #echo "Resolved symbolic links for JENA_HOME to $JENA_HOME"
 fi
 
-
 ## Classpath JENA_CP.
 if [ -z "$JENA_CP" ] ; then
    if [ -z "$JENA_HOME" ]; then
@@ -181,6 +240,9 @@
 while [ $# -gt 0 ]
 do
     ARG=$1
+    ## --tmpdir
+    ## --loc|--location
+    ## --help
     case $ARG in
 	-d|--debug)
 	    # Debug Mode
@@ -226,8 +288,7 @@
     esac
 done
 	       
-if [[ $# -eq 0 ]]
-then
+if [[ $# -eq 0 ]] ; then
     abort 1 "No files to load" 1>&2
 fi
 
@@ -239,9 +300,6 @@
 
 [[ -z $TMPDIR ]] && TMPDIR=$LOC
 export TMPDIR
-## --tmpdir
-## --loc|--location
-## --help
 
 ## TDB1 / TDB2
 ## @@
@@ -269,74 +327,101 @@
 	;;
 esac
 
-## Delete database!
+## Don't mess up an existing database!
 if [ -e "$LOC" ]; then
     ## @@ Better
     abort 3 "Directory $LOC already exists"
 fi
 
-JAVA="${JAVA:-java}"
-
 info "Setup:"
-info "  Data:     $DATAFILES"
 info "  Database: $LOC"
-info "  Tmpdir:   $TMPDIR"
+info "  Data:     $DATAFILES"
+info "  TMPDIR:   $TMPDIR"
 
 # Large heap not required.
 JVM_ARGS="${JVM_ARGS:--Xmx2G}"
 
-## Time points.
+## Time point.
 
 TIME_START="$(now)"
 
-## Node table loading.
+## ======== Node table loading.
 if [ "$SYSTEM" == "TDB2" ]; then
+    ## TDB2 only.
+    info
     T="$(now)"
     info "Load node table"
     exec_java $PKG.CmdxBuildNodeTable --loc $LOC --tmpdir "$TMPDIR" $DATAFILES
     TIME_NODE_TABLE=$(($(now)-$T))
 fi
 
-## Ingest data, create workfiles
+## ======== Ingest data, creates workfiles
 info
 info "Ingest data"
 T="$(now)"
 exec_java $PKG.CmdxIngestData --loc $LOC --tmpdir "$TMPDIR" --triples "$TMPDIR/triples.tmp" --quads "$TMPDIR/quads.tmp" $DATAFILES
 TIME_INGEST=$(($(now)-$T))
 
-## @@ triples.tmp quads.tmp
+## ======== Indexes
+INFO="$TMPDIR/load.json"
+
+## Bash assocative array
+declare -A TIME_IDX
 
 function index() {
     local IDX="$1"
+    info
+    info "Build $IDX"
+    local T="$(now)"
     exec_java $PKG.CmdxBuildIndex --loc $LOC --tmpdir "$TMPDIR" --index $IDX \
 	      "$TMPDIR/triples.tmp" "$TMPDIR/quads.tmp"
+    local T_IDX=$(($(now)-$T))
+    TIME_IDX[$IDX]=$T_IDX
 }
 
-info
-info "Build SPO"
-T="$(now)"
-index SPO
-TIME_IDX_SPO=$(($(now)-$T))
+## Decide which indexes to generate.
+TRIPLES_DFT="SPO POS OSP"
+QUADS_DFT="GSPO GPOS GOSP SPOG POSG OSPG"
+
+TRIPLES_IDX="${TRIPLES_IDX:-$TRIPLES_DFT}"
+QUADS_IDX="${QUADS_IDX:-$QUADS_DFT}"
+
+if [ -e "$INFO" ] ; then
+    ## Skip a phase if there are no items to index.
+    TRIPLES="$(jq .triples < $INFO)"
+    QUADS="$(jq .quads < $INFO)"
+    if [[ $TRIPLES -eq 0 ]] ; then
+	TRIPLES_IDX=""
+    fi
+    if [[ $QUADS -eq 0 ]] ; then
+	QUADS_IDX=""
+    fi
+fi
+
+## ==== Triples
+
+for IDX in $TRIPLES_IDX ; do
+    index $IDX
+done
+
+## ==== Quads
+
+for IDX in $QUADS_IDX ; do
+    index $IDX
+done
+
+## ======== Finish
+
+## Delete temp files.
+## rm -f "$TMPDIR"/triples.tmp* "$TMPDIR"/quads.tmp*
 
 info
-info "Build POS"
-T="$(now)"
-index POS
-TIME_IDX_POS=$(($(now)-$T))
-
-info
-info "Build OSP"
-T="$(now)"
-index OSP
-let TIME_IDX_OSP=$(($(now)-$T))
-
-## @@
-#rm  "$TMPDIR/triples.tmp" "$TMPDIR/quads.tmp"
-
 TIME_FINISH="$(now)"
 
+## ======== Reporting
 TIME_TOTAL=$(($TIME_FINISH-$TIME_START))
 
+## Ingest
 if [ -n "$TIME_NODE_TABLE" ]; then
     info "Load node table  = $TIME_NODE_TABLE seconds"
 fi
@@ -345,8 +430,27 @@
 TIME_HMS="$(printf '%02dh %02dm %02ds\n' $((SECS/3600)) $((SECS%3600/60)) $((SECS%60)))"
 
 info "Load ingest data = $TIME_INGEST seconds"
-info "Build index SPO  = $TIME_IDX_SPO seconds"
-info "Build index POS  = $TIME_IDX_POS seconds"
-info "Build index OSP  = $TIME_IDX_OSP seconds"
+
+## Indexes
+for IDX in $TRIPLES_IDX ; do
+     info "Build index ${IDX}  = ${TIME_IDX[${IDX}]} seconds"
+done
+for IDX in $QUADS_IDX ; do
+     info "Build index ${IDX} = ${TIME_IDX[${IDX}]} seconds"
+done
+
+## Whole run
 info "Overall          $TIME_TOTAL seconds"
 info "Overall          $TIME_HMS"
+
+if [[ -e $INFO ]]
+then
+    printf -v TRIPLES_STR "%'d" "$TRIPLES"
+    printf -v QUADS_STR "%'d" "$QUADS"
+    info "Triples loaded   = $TRIPLES_STR"
+    info "Quads loaded     = $QUADS_STR"
+    TUPLES=$(($TRIPLES+$QUADS))
+    RATE=$(($TUPLES / $TIME_TOTAL))
+    printf -v RATE_STR "%'d" "$RATE"
+    info "Overall Rate     $RATE_STR tuples per second"
+fi
diff --git a/jena-cmds/src/main/java/tdb2/xloader/AbstractCmdxLoad.java b/jena-cmds/src/main/java/tdb2/xloader/AbstractCmdxLoad.java
index 912d7b1..ffd1056 100644
--- a/jena-cmds/src/main/java/tdb2/xloader/AbstractCmdxLoad.java
+++ b/jena-cmds/src/main/java/tdb2/xloader/AbstractCmdxLoad.java
@@ -51,12 +51,17 @@
     protected static ArgDecl argTmpdir     = new ArgDecl(true, "tmpdir", "tmp");
     protected static ArgDecl argIndex      = new ArgDecl(true, "index");
 
-    protected static ArgDecl argSortArgs   = new ArgDecl(true, "sortArgs", "sortargs");
+    // If this is put back, note there are two different sorts - one for the node table and several for the indexes.
+    protected static ArgDecl argSortNodeTableArgs   = new ArgDecl(true, "sortNodeTableArgs");
+    protected static ArgDecl argSortIndexArgs   = new ArgDecl(true, "sortIndexArgs");
 
     protected String location = null;
     protected String tmpdir = null;
     protected String indexName = null;
-    protected String sortArgs = null;
+
+    protected String sortNodeTableArgs = null;
+    protected String sortIndexArgs = null;
+
     protected List<String> filenames = null;
 
     protected XLoaderFiles loaderFiles = null;
@@ -89,9 +94,9 @@
         location = super.getValue(argLocation);
         tmpdir = super.getValue(argTmpdir);
         indexName = super.getValue(argIndex);
-        sortArgs = super.getValue(argSortArgs);
 
-
+        sortNodeTableArgs = super.getValue(argSortNodeTableArgs);
+        sortIndexArgs = super.getValue(argSortIndexArgs);
 
         if ( location != null )
             checkDirectory(location);
@@ -104,7 +109,6 @@
         subCheckArgs();
 
         loaderFiles = new XLoaderFiles(tmpdir);
-
     }
 
     private void checkDirectory(String dirname) {
diff --git a/jena-cmds/src/main/java/tdb2/xloader/CmdxBuildIndex.java b/jena-cmds/src/main/java/tdb2/xloader/CmdxBuildIndex.java
index 2c494d0..f7b04e8 100644
--- a/jena-cmds/src/main/java/tdb2/xloader/CmdxBuildIndex.java
+++ b/jena-cmds/src/main/java/tdb2/xloader/CmdxBuildIndex.java
@@ -40,6 +40,7 @@
         super.add(argLocation,  "--loc=", "Database location");
         super.add(argTmpdir,    "--tmpdir=", "Temporary directory (defaults to --loc)");
         super.add(argIndex,     "--index=", "Index name");
+        //super.add(argSortIndexArgs, "--sortIndexArgs=", "Specialised argument for the sort for the indexes");
     }
 
     @Override
@@ -85,7 +86,6 @@
         long items = ProcIndexBuildX.exec(location, indexName, loaderFiles);
 
         long timeMillis = timer.endTimer();
-        //FmtLog.info(LOG, "Done - NodeTable - %s seconds", Timer.timeStr(timeMillis));
 
         double xSec = timeMillis/1000.0;
         double rate = items/xSec;
diff --git a/jena-cmds/src/main/java/tdb2/xloader/CmdxBuildNodeTable.java b/jena-cmds/src/main/java/tdb2/xloader/CmdxBuildNodeTable.java
index 0132781..4b54e5e 100644
--- a/jena-cmds/src/main/java/tdb2/xloader/CmdxBuildNodeTable.java
+++ b/jena-cmds/src/main/java/tdb2/xloader/CmdxBuildNodeTable.java
@@ -26,6 +26,8 @@
 import org.apache.jena.cmd.CmdException;
 import org.apache.jena.tdb2.xloader.BulkLoaderX;
 import org.apache.jena.tdb2.xloader.ProcNodeTableBuilderX;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class CmdxBuildNodeTable extends AbstractCmdxLoad {
 
@@ -34,13 +36,14 @@
     }
 
     protected CmdxBuildNodeTable(String[] argv) {
-        super("Terms", argv);
+        super("Nodes", argv);
     }
 
     @Override
     protected void setCmdArgs() {
         super.add(argLocation,  "--loc=", "Database location");
         super.add(argTmpdir,    "--tmpdir=", "Temporary directory (defaults to --loc)");
+        //super.add(argSortNodeTableArgs, "--sortNodeTableArgs=", "Specialised argument for the sort for the node table");
     }
 
     @Override
@@ -77,10 +80,12 @@
         if ( tmpdir == null )
             tmpdir = location;
 
-        Pair<Long/*triples*/, Long/*indexed nodes*/> buildCounts = ProcNodeTableBuilderX.exec(LOG, location, loaderFiles, filenames, sortArgs);
+        Logger LOG1 = LOG;
+        Logger LOG2 = LoggerFactory.getLogger("Terms");
+
+        Pair<Long/*triples or quads*/, Long/*indexed nodes*/> buildCounts = ProcNodeTableBuilderX.exec(LOG1, LOG2, location, loaderFiles, filenames, sortNodeTableArgs);
 
         long timeMillis = timer.endTimer();
-        //FmtLog.info(LOG, "Done - NodeTable - %s seconds", Timer.timeStr(timeMillis));
 
         long items = buildCounts.getLeft();
         double xSec = timeMillis/1000.0;
@@ -88,7 +93,6 @@
         String elapsedStr = BulkLoaderX.milliToHMS(timeMillis);
         String rateStr = BulkLoaderX.rateStr(items, timeMillis);
 
-        FmtLog.info(LOG, "NodeTable - %s seconds - %s at %s TPS", Timer.timeStr(timeMillis), elapsedStr, rateStr);
-        // XXX Write size as tmp file.
+        FmtLog.info(LOG, "NodeTable - %s seconds - %s at %s terms per second", Timer.timeStr(timeMillis), elapsedStr, rateStr);
     }
 }
diff --git a/jena-cmds/src/main/java/tdb2/xloader/CmdxLoader.java b/jena-cmds/src/main/java/tdb2/xloader/CmdxLoader.java
index d005787..b14e8de 100644
--- a/jena-cmds/src/main/java/tdb2/xloader/CmdxLoader.java
+++ b/jena-cmds/src/main/java/tdb2/xloader/CmdxLoader.java
@@ -28,7 +28,7 @@
  * <p>
  * This still requires an external sort programme.
  * Normally, xloader is run by script which uses one JVM per operation.
- * Exiting the JVM and starting a new one
+ * Exiting the JVM and starting a new one clears the process state which is beneficial.
  * <p>
  * This program does not need much RAM. Do not set the heap size large.
  * 4Gbytes is enough, usually 2Gbytes is sufficient.
diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/xloader/BulkLoaderX.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/xloader/BulkLoaderX.java
index b8e8fb1..9b053b1 100644
--- a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/xloader/BulkLoaderX.java
+++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/xloader/BulkLoaderX.java
@@ -24,6 +24,23 @@
     public static int DataTick = 1_000_000;
     public static int DataSuperTick = 10;
 
+    /**
+     * Whether to compress the triple.tmp and quads.tmp files.
+     * These are read multiple times.
+     */
+    public static boolean CompressDataFiles = true;
+
+    /**
+     * Whether to compress intermediate sort files for the node table.
+     * We'll need this amount of space for the final indexes so this isn't helpful.
+     */
+    public static boolean CompressSortNodeTableFiles = false;
+
+    /**
+     * Whether to compress intermediate sort files for the indexes.
+     */
+    public static boolean CompressSortIndexFiles = true;
+
     public static Thread async(Runnable action, String threadName) {
         Objects.requireNonNull(action);
         Objects.requireNonNull(threadName);
diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/xloader/ProcIndexBuildX.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/xloader/ProcIndexBuildX.java
index 0da3681..fecaaf7 100644
--- a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/xloader/ProcIndexBuildX.java
+++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/xloader/ProcIndexBuildX.java
@@ -143,31 +143,48 @@
         Process proc2;
         OutputStream toSortOutputStream; // Not used. Input is a file.
         InputStream fromSortInputStream;
+
         try {
             //LOG.info("Step : external sort : "+indexName);
             //if ( sortArgs != null ) {}
+
             List<String> sortCmdBasics = Arrays.asList(
                  "sort",
                     "--temporary-directory="+TMPDIR, "--buffer-size=50%",
                     "--parallel=2", "--unique"
                     //, "--compress-program=/usr/bin/gzip"
-                );
+            );
             List<String> sortCmd = new ArrayList<>(sortCmdBasics);
+            if ( BulkLoaderX.CompressSortIndexFiles )
+                sortCmd.add("--compress-program=/usr/bin/gzip");
             sortCmd.addAll(sortKeyArgs);
-            // Add the file to sort.
-            sortCmd.add(datafile);
+            // Add the file to sort if not compressed.
+            if ( ! BulkLoaderX.CompressDataFiles )
+                sortCmd.add(datafile);
+            // else this process will decompress and send the data.
+
             proc2 = new ProcessBuilder(sortCmd).start();
-            // To process. Not used.
+
+            // To process. Not used if uncompressed file.
             toSortOutputStream = proc2.getOutputStream();
             // From process
             fromSortInputStream = proc2.getInputStream(); // Needs buffering
-            // Debug sort process.
+//            // Debug sort process.
 //            InputStream fromSortErrortStream = proc2.getErrorStream();
 //            IOUtils.copy(fromSortErrortStream, System.err);
         } catch (Exception ex) {
             throw new RuntimeException(ex);
         }
 
+        if ( BulkLoaderX.CompressDataFiles ) {
+            // Handles .gz
+            InputStream inData = IO.openFile(datafile);
+            try {
+                inData.transferTo(toSortOutputStream);
+                toSortOutputStream.close();
+            } catch (IOException ex) { IO.exception(ex); }
+        }
+
         // From sort, buffered.
         InputStream input = IO.ensureBuffered(fromSortInputStream);
         // This thread - run builder.
diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/xloader/ProcIngestDataX.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/xloader/ProcIngestDataX.java
index 78a144f..9235e7f 100644
--- a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/xloader/ProcIngestDataX.java
+++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/xloader/ProcIngestDataX.java
@@ -18,13 +18,18 @@
 
 package org.apache.jena.tdb2.xloader;
 
+import java.io.IOException;
 import java.io.OutputStream;
 import java.util.List;
 
 import org.apache.jena.atlas.io.IO;
+import org.apache.jena.atlas.json.JSON;
+import org.apache.jena.atlas.json.JsonObject;
 import org.apache.jena.atlas.lib.BitsLong;
 import org.apache.jena.atlas.lib.DateTimeUtils;
+import org.apache.jena.atlas.lib.Pair;
 import org.apache.jena.atlas.lib.Timer;
+import org.apache.jena.atlas.logging.FmtLog;
 import org.apache.jena.dboe.base.file.Location;
 import org.apache.jena.dboe.sys.Names;
 import org.apache.jena.graph.Node;
@@ -70,23 +75,41 @@
         ProgressMonitor monitor = ProgressMonitorOutput.create(LOG, "Data", BulkLoaderX.DataTick, BulkLoaderX.DataSuperTick);
         // WriteRows does it's own buffering and has direct write-to-buffer.
         // Do not buffer here.
+        // Adds gzip processing.
         OutputStream outputTriples = IO.openOutputFile(loaderFiles.triplesFile);
         OutputStream outputQuads = IO.openOutputFile(loaderFiles.quadsFile);
 
-        dsg.executeWrite(()->build(dsg, monitor,
-                                   outputTriples, outputQuads,
-                                   datafiles));
+        OutputStream outT = outputTriples;
+        OutputStream outQ = outputQuads;
+        dsg.executeWrite(() -> {
+            Pair<Long, Long> p = build(dsg, monitor, outT, outQ, datafiles);
+            String str = DateTimeUtils.nowAsXSDDateTimeString();
+            long cTriple = p.getLeft();
+            long cQuad = p.getRight();
+            FmtLog.info(LOG, "Triples = %,d ; Quads = %,d", cTriple, cQuad);
+            JsonObject obj = JSON.buildObject(b->{
+                b.pair("ingested", str);
+                b.key("data").startArray();
+                datafiles.forEach(fn->b.value(fn));
+                b.finishArray();
+                b.pair("triples", cTriple);
+                b.pair("quads", cQuad);
+            });
+            try ( OutputStream out = IO.openOutputFile(loaderFiles.loadInfo) ) {
+                JSON.write(out, obj);
+            } catch (IOException ex) { IO.exception(ex); }
+        });
         TDBInternal.expel(dsg);
         SystemIRIx.setProvider(provider);
     }
 
-    private static void build(DatasetGraph dsg, ProgressMonitor monitor,
+    private static Pair<Long, Long> build(DatasetGraph dsg, ProgressMonitor monitor,
                               OutputStream outputTriples, OutputStream outputQuads,
                               List<String> datafiles) {
         DatasetGraphTDB dsgtdb = TDBInternal.getDatasetGraphTDB(dsg);
         outputTriples = IO.ensureBuffered(outputTriples);
         outputQuads = IO.ensureBuffered(outputQuads);
-        NodeTableBuilder sink = new NodeTableBuilder(dsgtdb, monitor, outputTriples, outputQuads, false);
+        IngestData sink = new IngestData(dsgtdb, monitor, outputTriples, outputQuads, false);
         Timer timer = new Timer();
         timer.startTimer();
         // [BULK] XXX Start monitor on first item from parser.
@@ -102,6 +125,9 @@
         IO.close(outputTriples);
         IO.close(outputQuads);
 
+        long cTriple = sink.tripleCount();
+        long cQuad = sink.quadCount();
+
         // ---- Stats
 
         // See Stats class.
@@ -120,21 +146,22 @@
         String str =  String.format("Total: %,d tuples : %,.2f seconds : %,.2f tuples/sec [%s]",
                                     total, elapsedSecs, rate, DateTimeUtils.nowAsString());
         LOG.info(str);
+        return Pair.create(cTriple, cQuad);
     }
 
-    static class NodeTableBuilder implements StreamRDF
-    {
+    static class IngestData implements StreamRDF {
         private DatasetGraphTDB dsg;
         private NodeTable nodeTable;
+        long countTriples = 0;
+        long countQuads = 0;
         private WriteRows writerTriples;
         private WriteRows writerQuads;
         private ProgressMonitor monitor;
         private StatsCollectorNodeId stats;
 
-        NodeTableBuilder(DatasetGraphTDB dsg, ProgressMonitor monitor,
-                         OutputStream outputTriples, OutputStream outputQuads,
-                         boolean collectStats)
-        {
+        IngestData(DatasetGraphTDB dsg, ProgressMonitor monitor,
+                   OutputStream outputTriples, OutputStream outputQuads,
+                   boolean collectStats) {
             this.dsg = dsg;
             this.monitor = monitor;
             NodeTupleTable ntt = dsg.getTripleTable().getNodeTupleTable();
@@ -145,62 +172,58 @@
                 this.stats = new StatsCollectorNodeId(nodeTable);
         }
 
-        //@Override
-        public void startBulk()
-        {}
+        // @Override
+        public void startBulk() {}
 
         @Override
-        public void start()
-        {}
+        public void start() {}
 
         @Override
-        public void finish()
-        {}
+        public void finish() {}
 
-        //@Override
-        public void finishBulk()
-        {
+        // @Override
+        public void finishBulk() {
             writerTriples.flush();
             writerQuads.flush();
             nodeTable.sync();
 
-           //dsg.getStoragePrefixes().sync();
+            // dsg.getStoragePrefixes().sync();
         }
 
         @Override
-        public void triple(Triple triple)
-        {
+        public void triple(Triple triple) {
+            countTriples++;
             Node s = triple.getSubject();
             Node p = triple.getPredicate();
             Node o = triple.getObject();
-            process(Quad.tripleInQuad,s,p,o);
+            process(Quad.tripleInQuad, s, p, o);
         }
 
         @Override
-        public void quad(Quad quad)
-        {
+        public void quad(Quad quad) {
+            countQuads++;
             Node s = quad.getSubject();
             Node p = quad.getPredicate();
             Node o = quad.getObject();
             Node g = null;
             // Union graph?!
-            if ( ! quad.isTriple() && ! quad.isDefaultGraph() )
+            if ( !quad.isTriple() && !quad.isDefaultGraph() )
                 g = quad.getGraph();
-            process(g,s,p,o);
+            process(g, s, p, o);
         }
 
         // --> From NodeIdFactory
         private static long encode(NodeId nodeId) {
             long x = nodeId.getPtrLocation(); // Should be "getValue"
-            switch(nodeId.type()) {
-                case PTR:
+            switch (nodeId.type()) {
+                case PTR :
                     return x;
-                case XSD_DOUBLE:
+                case XSD_DOUBLE :
                     // XSD_DOUBLE is special.
                     // Set value bit (63) and bit 62
                     x = DoubleNode62.insertType(x);
                     return x;
-                default:
+                default :
                     // Bit 62 is zero - tag is for doubles.
                     x = BitsLong.pack(x, nodeId.getTypeValue(), 56, 62);
                     // Set the high, value bit.
@@ -239,11 +262,16 @@
             monitor.tick();
         }
 
-        public StatsCollectorNodeId getCollector() { return stats; }
+        public StatsCollectorNodeId getCollector() {
+            return stats;
+        }
+
+        public long tripleCount() { return countTriples; }
+
+        public long quadCount()   { return countQuads; }
 
         @Override
-        public void base(String base)
-        {}
+        public void base(String base) {}
 
         @Override
         public void prefix(String prefix, String iri) {
diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/xloader/ProcNodeTableBuilderX.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/xloader/ProcNodeTableBuilderX.java
index 8bbe2b9..e13f045 100644
--- a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/xloader/ProcNodeTableBuilderX.java
+++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/xloader/ProcNodeTableBuilderX.java
@@ -24,6 +24,8 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
@@ -54,12 +56,12 @@
 import org.apache.jena.riot.thrift.wire.RDF_Term;
 import org.apache.jena.sparql.core.DatasetGraph;
 import org.apache.jena.sparql.core.Quad;
-import org.apache.jena.tdb2.DatabaseMgr;
-import org.apache.jena.tdb2.TDBException;
-import org.apache.jena.tdb2.lib.NodeLib;
 import org.apache.jena.system.progress.ProgressIterator;
 import org.apache.jena.system.progress.ProgressMonitorOutput;
 import org.apache.jena.system.progress.ProgressStreamRDF;
+import org.apache.jena.tdb2.DatabaseMgr;
+import org.apache.jena.tdb2.TDBException;
+import org.apache.jena.tdb2.lib.NodeLib;
 import org.apache.jena.tdb2.store.DatasetGraphTDB;
 import org.apache.jena.tdb2.store.Hash;
 import org.apache.jena.tdb2.store.NodeId;
@@ -79,10 +81,13 @@
 public class ProcNodeTableBuilderX {
 
     /** Pair<triples, indexed nodes> */
-    // [BULK} Output, not return.
-    public static Pair<Long, Long> exec(Logger LOG, String DB, XLoaderFiles loaderFiles, List<String> datafiles, String sortArgs) {
+    // [BULK] Output, not return.
+    public static Pair<Long, Long> exec(Logger LOG1, Logger LOG2, String DB, XLoaderFiles loaderFiles, List<String> datafiles, String sortNodeTableArgs) {
         //Threads - 1 parser, 1 builder, 2 sort.
-        // Excludes counting inlines.
+        // Steps:
+        // 1 - parser to and pipe terms to sort
+        // 2 - sort
+        // 3 - build node table from unique sort
 
         IRIProvider provider = SystemIRIx.getProvider();
         //SystemIRIx.setProvider(new IRIProviderAny());
@@ -99,22 +104,27 @@
         Process proc2;
         try {
             //LOG.info("Step : external sort");
-            String[] sortCmd =
-                { "sort",
+            List<String> sortCmdBasics = Arrays.asList(
+                  "sort",
                     "--temporary-directory="+loaderFiles.TMPDIR, "--buffer-size=50%",
                     "--parallel=2", "--unique",
-                    // Don't compress. There will be enough space because later we work on indexes.
                     //"--compress-program=/usr/bin/gzip",
                     "--key=1,1"
-                };
-            if ( sortArgs != null ) {}
+                    );
 
+            List<String> sortCmd = new ArrayList<>(sortCmdBasics);
+
+            //if ( sortNodeTableArgs != null ) {}
+
+            // See javadoc for CompressSortNodeTableFiles - usually false
+            if ( BulkLoaderX.CompressSortNodeTableFiles )
+                sortCmd.add("--compress-program=/usr/bin/gzip");
             proc2 = new ProcessBuilder(sortCmd).start();
+
             // To process.
             toSortOutputStream = proc2.getOutputStream(); // Needs buffering
             // From process
             fromSortInputStream = proc2.getInputStream(); // Needs buffering
-
 //            // Debug sort process.
 //            InputStream fromSortErrortStream = proc2.getErrorStream();
 //            IOUtils.copy(fromSortErrortStream, System.err);
@@ -132,7 +142,7 @@
 
         //LOG.info("Step : parse & send to external sort");
         Runnable task1 = ()->{
-            ProgressMonitorOutput monitor = ProgressMonitorOutput.create(LOG, "Nodes", tickPoint, superTick);
+            ProgressMonitorOutput monitor = ProgressMonitorOutput.create(LOG1, "Nodes", tickPoint, superTick);
             OutputStream output = IO.ensureBuffered(toSortOutputStream);
             // Counting.
             StreamRDF worker = new NodeHashTmpStream(output);
@@ -160,14 +170,14 @@
 
             double xSec = x/1000.0;
             double rate = count/xSec;
-            FmtLog.info(LOG, "== Parse: %s seconds : %,d triples/quads %,.0f TPS", Timer.timeStr(x), count, rate);
+            FmtLog.info(LOG1, "== Parse: %s seconds : %,d triples/quads %,.0f TPS", Timer.timeStr(x), count, rate);
         };
 
         // [BULK] XXX AsyncParser.asyncParse(files, output)
         Thread thread1 = async(task1, "AsyncParser");
 
-        //LOG.info("Step : read external sort, build node-data file, build index");
-        Runnable task2 = ()->{
+        // Step3: build node table.
+        Runnable task3 = ()->{
             Timer timer = new Timer();
             // Don't start timer until sort send something
             InputStream input = IO.ensureBuffered(fromSortInputStream);
@@ -176,14 +186,12 @@
             BufferChannel blkState = FileFactory.createBufferChannel(fileSet, Names.extBptState);
             long idxTickPoint = BulkLoaderX.DataTick;
             int idxSuperTick = BulkLoaderX.DataSuperTick;
-            ProgressMonitorOutput monitor = ProgressMonitorOutput.create(LOG, "Index", idxTickPoint, idxSuperTick);
-
-            // SLOW
+            ProgressMonitorOutput monitor = ProgressMonitorOutput.create(LOG2, "Index", idxTickPoint, idxSuperTick);
 
             // Library of tools!
             dsg.executeWrite(()->{
                 BinaryDataFile objectFile = nodeTable.getData();
-                Iterator<Record> rIter = records(LOG, input, objectFile);
+                Iterator<Record> rIter = records(LOG2, input, objectFile);
                 rIter = new ProgressIterator<>(rIter, monitor);
                 // Record of (hash, nodeId)
                 BPlusTree bpt1 = (BPlusTree)(nodeTable.getIndex());
@@ -210,23 +218,23 @@
             long count = monitor.getTicks();
             countIndexedNodes.set(count);
             String rateStr = BulkLoaderX.rateStr(count, x);
-            FmtLog.info(LOG, "==  Index: %s seconds : %,d indexed RDF terms : %s PerSecond", Timer.timeStr(x), count, rateStr);
+            FmtLog.info(LOG2, "==  Index: %s seconds : %,d indexed RDF terms : %s PerSecond", Timer.timeStr(x), count, rateStr);
 
         };
-        Thread thread2 = async(task2, "AsyncBuild");
+        Thread thread3 = async(task3, "AsyncBuild");
 
         try {
             int x = proc2.waitFor();
             if ( x != 0 )
-                FmtLog.error(LOG, "Sort RC = %d", x);
+                FmtLog.error(LOG2, "Sort RC = %d", x);
             else
-                LOG.info("Sort finished");
+                LOG2.info("Sort finished");
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
 
         BulkLoaderX.waitFor(thread1);
-        BulkLoaderX.waitFor(thread2);
+        BulkLoaderX.waitFor(thread3);
 
         return Pair.create(countParseTicks.get(), countIndexedNodes.get());
     }
@@ -336,7 +344,7 @@
     static class NodeHashTmpStream implements StreamRDF {
 
         private final OutputStream outputData;
-        private CacheSet<Node> cache = CacheFactory.createCacheSet(100_000);
+        private CacheSet<Node> cache = CacheFactory.createCacheSet(500_000);
 
         NodeHashTmpStream(OutputStream outputFile) {
             this.outputData = outputFile;
@@ -378,7 +386,6 @@
                 return;
             cache.add(node);
             // -- Hash of node
-            //SystemTDB.LenNodeHash, SystemTDB.SizeOfNodeId
             NodeLib.setHash(hash, node);
             try {
                 byte k[] = hash.getBytes();
@@ -388,7 +395,6 @@
                 outputData.write(' ');
                 write(outputData, tBytes);
                 outputData.write('\n');
-
             } catch (TException e) {
                 e.printStackTrace();
             } catch (IOException e) {
@@ -411,6 +417,5 @@
         public void finish() {
             IO.flush(outputData);
         }
-
     }
 }
diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/xloader/XLoaderFiles.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/xloader/XLoaderFiles.java
index a14c19d..4d57578 100644
--- a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/xloader/XLoaderFiles.java
+++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/xloader/XLoaderFiles.java
@@ -26,7 +26,7 @@
     // Fixed file names in temporary directory
     static final String nameTriplesFile = "triples.tmp";
     static final String nameQuadsFile = "quads.tmp";
-    static final String nameLoadInfo = "load.txt";
+    static final String nameLoadInfo = "load.json";
 
     // Names.
     public final String TMPDIR;
@@ -34,10 +34,12 @@
     public final String quadsFile;
     public final String loadInfo;
     public XLoaderFiles(String TMPDIR) {
+        String ext = BulkLoaderX.CompressDataFiles ? ".gz" : "";
+
         this.TMPDIR = Objects.requireNonNull(TMPDIR);
         Path loc = Path.of(TMPDIR);
-        triplesFile = loc.resolve(nameTriplesFile).toString();
-        quadsFile = loc.resolve(nameQuadsFile).toString();
+        triplesFile = loc.resolve(nameTriplesFile).toString()+ext;
+        quadsFile = loc.resolve(nameQuadsFile).toString()+ext;
         loadInfo = loc.resolve(nameLoadInfo).toString();
     }
 }
\ No newline at end of file
diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/xloader/package-info.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/xloader/package-info.java
new file mode 100644
index 0000000..3f9c1eb
--- /dev/null
+++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/xloader/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.tdb2.xloader;
+