Merge pull request #1127 from afs/xloader
Xloader (more)
diff --git a/apache-jena/bin/tdb2.xloader b/apache-jena/bin/tdb2.xloader
index 9edd48f..bfd304c 100755
--- a/apache-jena/bin/tdb2.xloader
+++ b/apache-jena/bin/tdb2.xloader
@@ -16,13 +16,71 @@
## See the License for the specific language governing permissions and
## limitations under the License.
+## External environment variables.
+## JENA_CP
+## JENA_HOME
+## JVM_ARGS
+
+## Programs used:
+## jq
+## sort
+## /usr/bin/gzip
+
+## ======= Check environment
+
+function testForProgram() {
+ local CMD="$1"
+ type -p "$CMD" &> /dev/null
+ if [[ $? -ne 0 ]]; then
+ echo "Command $CMD not found"
+ return 1
+ fi
+}
+
+## Test for "sort --parallel" (though without that, it's going to be slower!
+function testSortParallel() {
+ set +e
+ sort --parallel=3 < /dev/null &>/dev/null
+ if [[ $? -ne 0 ]]; then
+ echo "No --parallel support in sort(1)" 2&>1
+ exit 9
+ fi
+ set -e
+}
+
+JAVA="${JAVA:-java}"
+
+COMPLETE="yes"
+for F in jq sort /usr/bin/gzip java
+do
+ testForProgram "$F"
+ if [[ $? -ne 0 ]] ; then
+ COMPLETE="no"
+ fi
+done
+
+if [[ $COMPLETE -ne "yes" ]] ; then
+ echo "One or more programs missing" 2&>1
+ exit 9
+fi
+
+unset COMPLETE
+
+testSortParallel
+
+## ======== Setup
+
+## Environment variable TMPDIR is ignored.
+## it is often a small-ish area unsuitable for large temp files.
+## Use the --tmpdir flag
+
+TMPDIR=
+
+## Format used in logging with date(1).
DATE="+%H:%M:%S"
-# JENA_CP=
-# JENA_HOME=
-# TMPDIR=
## Functions.
-# log "LEVEL" "MESSAGE"
+# Usage: log "LEVEL" "MESSAGE"
function log() {
local LEVEL="$1"
local MSG="$2"
@@ -92,6 +150,8 @@
Do not set to all available RAM.
Increasing it does not make the loader faster.
+The temporary directory defaults to the datbase directory.
+
EOF
}
@@ -160,7 +220,6 @@
#echo "Resolved symbolic links for JENA_HOME to $JENA_HOME"
fi
-
## Classpath JENA_CP.
if [ -z "$JENA_CP" ] ; then
if [ -z "$JENA_HOME" ]; then
@@ -181,6 +240,9 @@
while [ $# -gt 0 ]
do
ARG=$1
+ ## --tmpdir
+ ## --loc|--location
+ ## --help
case $ARG in
-d|--debug)
# Debug Mode
@@ -226,8 +288,7 @@
esac
done
-if [[ $# -eq 0 ]]
-then
+if [[ $# -eq 0 ]] ; then
abort 1 "No files to load" 1>&2
fi
@@ -239,9 +300,6 @@
[[ -z $TMPDIR ]] && TMPDIR=$LOC
export TMPDIR
-## --tmpdir
-## --loc|--location
-## --help
## TDB1 / TDB2
## @@
@@ -269,74 +327,101 @@
;;
esac
-## Delete database!
+## Don't mess up an existing database!
if [ -e "$LOC" ]; then
## @@ Better
abort 3 "Directory $LOC already exists"
fi
-JAVA="${JAVA:-java}"
-
info "Setup:"
-info " Data: $DATAFILES"
info " Database: $LOC"
-info " Tmpdir: $TMPDIR"
+info " Data: $DATAFILES"
+info " TMPDIR: $TMPDIR"
# Large heap not required.
JVM_ARGS="${JVM_ARGS:--Xmx2G}"
-## Time points.
+## Time point.
TIME_START="$(now)"
-## Node table loading.
+## ======== Node table loading.
if [ "$SYSTEM" == "TDB2" ]; then
+ ## TDB2 only.
+ info
T="$(now)"
info "Load node table"
exec_java $PKG.CmdxBuildNodeTable --loc $LOC --tmpdir "$TMPDIR" $DATAFILES
TIME_NODE_TABLE=$(($(now)-$T))
fi
-## Ingest data, create workfiles
+## ======== Ingest data, creates workfiles
info
info "Ingest data"
T="$(now)"
exec_java $PKG.CmdxIngestData --loc $LOC --tmpdir "$TMPDIR" --triples "$TMPDIR/triples.tmp" --quads "$TMPDIR/quads.tmp" $DATAFILES
TIME_INGEST=$(($(now)-$T))
-## @@ triples.tmp quads.tmp
+## ======== Indexes
+INFO="$TMPDIR/load.json"
+
+## Bash assocative array
+declare -A TIME_IDX
function index() {
local IDX="$1"
+ info
+ info "Build $IDX"
+ local T="$(now)"
exec_java $PKG.CmdxBuildIndex --loc $LOC --tmpdir "$TMPDIR" --index $IDX \
"$TMPDIR/triples.tmp" "$TMPDIR/quads.tmp"
+ local T_IDX=$(($(now)-$T))
+ TIME_IDX[$IDX]=$T_IDX
}
-info
-info "Build SPO"
-T="$(now)"
-index SPO
-TIME_IDX_SPO=$(($(now)-$T))
+## Decide which indexes to generate.
+TRIPLES_DFT="SPO POS OSP"
+QUADS_DFT="GSPO GPOS GOSP SPOG POSG OSPG"
+
+TRIPLES_IDX="${TRIPLES_IDX:-$TRIPLES_DFT}"
+QUADS_IDX="${QUADS_IDX:-$QUADS_DFT}"
+
+if [ -e "$INFO" ] ; then
+ ## Skip a phase if there are no items to index.
+ TRIPLES="$(jq .triples < $INFO)"
+ QUADS="$(jq .quads < $INFO)"
+ if [[ $TRIPLES -eq 0 ]] ; then
+ TRIPLES_IDX=""
+ fi
+ if [[ $QUADS -eq 0 ]] ; then
+ QUADS_IDX=""
+ fi
+fi
+
+## ==== Triples
+
+for IDX in $TRIPLES_IDX ; do
+ index $IDX
+done
+
+## ==== Quads
+
+for IDX in $QUADS_IDX ; do
+ index $IDX
+done
+
+## ======== Finish
+
+## Delete temp files.
+## rm -f "$TMPDIR"/triples.tmp* "$TMPDIR"/quads.tmp*
info
-info "Build POS"
-T="$(now)"
-index POS
-TIME_IDX_POS=$(($(now)-$T))
-
-info
-info "Build OSP"
-T="$(now)"
-index OSP
-let TIME_IDX_OSP=$(($(now)-$T))
-
-## @@
-#rm "$TMPDIR/triples.tmp" "$TMPDIR/quads.tmp"
-
TIME_FINISH="$(now)"
+## ======== Reporting
TIME_TOTAL=$(($TIME_FINISH-$TIME_START))
+## Ingest
if [ -n "$TIME_NODE_TABLE" ]; then
info "Load node table = $TIME_NODE_TABLE seconds"
fi
@@ -345,8 +430,27 @@
TIME_HMS="$(printf '%02dh %02dm %02ds\n' $((SECS/3600)) $((SECS%3600/60)) $((SECS%60)))"
info "Load ingest data = $TIME_INGEST seconds"
-info "Build index SPO = $TIME_IDX_SPO seconds"
-info "Build index POS = $TIME_IDX_POS seconds"
-info "Build index OSP = $TIME_IDX_OSP seconds"
+
+## Indexes
+for IDX in $TRIPLES_IDX ; do
+ info "Build index ${IDX} = ${TIME_IDX[${IDX}]} seconds"
+done
+for IDX in $QUADS_IDX ; do
+ info "Build index ${IDX} = ${TIME_IDX[${IDX}]} seconds"
+done
+
+## Whole run
info "Overall $TIME_TOTAL seconds"
info "Overall $TIME_HMS"
+
+if [[ -e $INFO ]]
+then
+ printf -v TRIPLES_STR "%'d" "$TRIPLES"
+ printf -v QUADS_STR "%'d" "$QUADS"
+ info "Triples loaded = $TRIPLES_STR"
+ info "Quads loaded = $QUADS_STR"
+ TUPLES=$(($TRIPLES+$QUADS))
+ RATE=$(($TUPLES / $TIME_TOTAL))
+ printf -v RATE_STR "%'d" "$RATE"
+ info "Overall Rate $RATE_STR tuples per second"
+fi
diff --git a/jena-cmds/src/main/java/tdb2/xloader/AbstractCmdxLoad.java b/jena-cmds/src/main/java/tdb2/xloader/AbstractCmdxLoad.java
index 912d7b1..ffd1056 100644
--- a/jena-cmds/src/main/java/tdb2/xloader/AbstractCmdxLoad.java
+++ b/jena-cmds/src/main/java/tdb2/xloader/AbstractCmdxLoad.java
@@ -51,12 +51,17 @@
protected static ArgDecl argTmpdir = new ArgDecl(true, "tmpdir", "tmp");
protected static ArgDecl argIndex = new ArgDecl(true, "index");
- protected static ArgDecl argSortArgs = new ArgDecl(true, "sortArgs", "sortargs");
+ // If this is put back, note there are two different sorts - one for the node table and several for the indexes.
+ protected static ArgDecl argSortNodeTableArgs = new ArgDecl(true, "sortNodeTableArgs");
+ protected static ArgDecl argSortIndexArgs = new ArgDecl(true, "sortIndexArgs");
protected String location = null;
protected String tmpdir = null;
protected String indexName = null;
- protected String sortArgs = null;
+
+ protected String sortNodeTableArgs = null;
+ protected String sortIndexArgs = null;
+
protected List<String> filenames = null;
protected XLoaderFiles loaderFiles = null;
@@ -89,9 +94,9 @@
location = super.getValue(argLocation);
tmpdir = super.getValue(argTmpdir);
indexName = super.getValue(argIndex);
- sortArgs = super.getValue(argSortArgs);
-
+ sortNodeTableArgs = super.getValue(argSortNodeTableArgs);
+ sortIndexArgs = super.getValue(argSortIndexArgs);
if ( location != null )
checkDirectory(location);
@@ -104,7 +109,6 @@
subCheckArgs();
loaderFiles = new XLoaderFiles(tmpdir);
-
}
private void checkDirectory(String dirname) {
diff --git a/jena-cmds/src/main/java/tdb2/xloader/CmdxBuildIndex.java b/jena-cmds/src/main/java/tdb2/xloader/CmdxBuildIndex.java
index 2c494d0..f7b04e8 100644
--- a/jena-cmds/src/main/java/tdb2/xloader/CmdxBuildIndex.java
+++ b/jena-cmds/src/main/java/tdb2/xloader/CmdxBuildIndex.java
@@ -40,6 +40,7 @@
super.add(argLocation, "--loc=", "Database location");
super.add(argTmpdir, "--tmpdir=", "Temporary directory (defaults to --loc)");
super.add(argIndex, "--index=", "Index name");
+ //super.add(argSortIndexArgs, "--sortIndexArgs=", "Specialised argument for the sort for the indexes");
}
@Override
@@ -85,7 +86,6 @@
long items = ProcIndexBuildX.exec(location, indexName, loaderFiles);
long timeMillis = timer.endTimer();
- //FmtLog.info(LOG, "Done - NodeTable - %s seconds", Timer.timeStr(timeMillis));
double xSec = timeMillis/1000.0;
double rate = items/xSec;
diff --git a/jena-cmds/src/main/java/tdb2/xloader/CmdxBuildNodeTable.java b/jena-cmds/src/main/java/tdb2/xloader/CmdxBuildNodeTable.java
index 0132781..4b54e5e 100644
--- a/jena-cmds/src/main/java/tdb2/xloader/CmdxBuildNodeTable.java
+++ b/jena-cmds/src/main/java/tdb2/xloader/CmdxBuildNodeTable.java
@@ -26,6 +26,8 @@
import org.apache.jena.cmd.CmdException;
import org.apache.jena.tdb2.xloader.BulkLoaderX;
import org.apache.jena.tdb2.xloader.ProcNodeTableBuilderX;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class CmdxBuildNodeTable extends AbstractCmdxLoad {
@@ -34,13 +36,14 @@
}
protected CmdxBuildNodeTable(String[] argv) {
- super("Terms", argv);
+ super("Nodes", argv);
}
@Override
protected void setCmdArgs() {
super.add(argLocation, "--loc=", "Database location");
super.add(argTmpdir, "--tmpdir=", "Temporary directory (defaults to --loc)");
+ //super.add(argSortNodeTableArgs, "--sortNodeTableArgs=", "Specialised argument for the sort for the node table");
}
@Override
@@ -77,10 +80,12 @@
if ( tmpdir == null )
tmpdir = location;
- Pair<Long/*triples*/, Long/*indexed nodes*/> buildCounts = ProcNodeTableBuilderX.exec(LOG, location, loaderFiles, filenames, sortArgs);
+ Logger LOG1 = LOG;
+ Logger LOG2 = LoggerFactory.getLogger("Terms");
+
+ Pair<Long/*triples or quads*/, Long/*indexed nodes*/> buildCounts = ProcNodeTableBuilderX.exec(LOG1, LOG2, location, loaderFiles, filenames, sortNodeTableArgs);
long timeMillis = timer.endTimer();
- //FmtLog.info(LOG, "Done - NodeTable - %s seconds", Timer.timeStr(timeMillis));
long items = buildCounts.getLeft();
double xSec = timeMillis/1000.0;
@@ -88,7 +93,6 @@
String elapsedStr = BulkLoaderX.milliToHMS(timeMillis);
String rateStr = BulkLoaderX.rateStr(items, timeMillis);
- FmtLog.info(LOG, "NodeTable - %s seconds - %s at %s TPS", Timer.timeStr(timeMillis), elapsedStr, rateStr);
- // XXX Write size as tmp file.
+ FmtLog.info(LOG, "NodeTable - %s seconds - %s at %s terms per second", Timer.timeStr(timeMillis), elapsedStr, rateStr);
}
}
diff --git a/jena-cmds/src/main/java/tdb2/xloader/CmdxLoader.java b/jena-cmds/src/main/java/tdb2/xloader/CmdxLoader.java
index d005787..b14e8de 100644
--- a/jena-cmds/src/main/java/tdb2/xloader/CmdxLoader.java
+++ b/jena-cmds/src/main/java/tdb2/xloader/CmdxLoader.java
@@ -28,7 +28,7 @@
* <p>
* This still requires an external sort programme.
* Normally, xloader is run by script which uses one JVM per operation.
- * Exiting the JVM and starting a new one
+ * Exiting the JVM and starting a new one clears the process state which is beneficial.
* <p>
* This program does not need much RAM. Do not set the heap size large.
* 4Gbytes is enough, usually 2Gbytes is sufficient.
diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/xloader/BulkLoaderX.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/xloader/BulkLoaderX.java
index b8e8fb1..9b053b1 100644
--- a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/xloader/BulkLoaderX.java
+++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/xloader/BulkLoaderX.java
@@ -24,6 +24,23 @@
public static int DataTick = 1_000_000;
public static int DataSuperTick = 10;
+ /**
+ * Whether to compress the triple.tmp and quads.tmp files.
+ * These are read multiple times.
+ */
+ public static boolean CompressDataFiles = true;
+
+ /**
+ * Whether to compress intermediate sort files for the node table.
+ * We'll need this amount of space for the final indexes so this isn't helpful.
+ */
+ public static boolean CompressSortNodeTableFiles = false;
+
+ /**
+ * Whether to compress intermediate sort files for the indexes.
+ */
+ public static boolean CompressSortIndexFiles = true;
+
public static Thread async(Runnable action, String threadName) {
Objects.requireNonNull(action);
Objects.requireNonNull(threadName);
diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/xloader/ProcIndexBuildX.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/xloader/ProcIndexBuildX.java
index 0da3681..fecaaf7 100644
--- a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/xloader/ProcIndexBuildX.java
+++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/xloader/ProcIndexBuildX.java
@@ -143,31 +143,48 @@
Process proc2;
OutputStream toSortOutputStream; // Not used. Input is a file.
InputStream fromSortInputStream;
+
try {
//LOG.info("Step : external sort : "+indexName);
//if ( sortArgs != null ) {}
+
List<String> sortCmdBasics = Arrays.asList(
"sort",
"--temporary-directory="+TMPDIR, "--buffer-size=50%",
"--parallel=2", "--unique"
//, "--compress-program=/usr/bin/gzip"
- );
+ );
List<String> sortCmd = new ArrayList<>(sortCmdBasics);
+ if ( BulkLoaderX.CompressSortIndexFiles )
+ sortCmd.add("--compress-program=/usr/bin/gzip");
sortCmd.addAll(sortKeyArgs);
- // Add the file to sort.
- sortCmd.add(datafile);
+ // Add the file to sort if not compressed.
+ if ( ! BulkLoaderX.CompressDataFiles )
+ sortCmd.add(datafile);
+ // else this process will decompress and send the data.
+
proc2 = new ProcessBuilder(sortCmd).start();
- // To process. Not used.
+
+ // To process. Not used if uncompressed file.
toSortOutputStream = proc2.getOutputStream();
// From process
fromSortInputStream = proc2.getInputStream(); // Needs buffering
- // Debug sort process.
+// // Debug sort process.
// InputStream fromSortErrortStream = proc2.getErrorStream();
// IOUtils.copy(fromSortErrortStream, System.err);
} catch (Exception ex) {
throw new RuntimeException(ex);
}
+ if ( BulkLoaderX.CompressDataFiles ) {
+ // Handles .gz
+ InputStream inData = IO.openFile(datafile);
+ try {
+ inData.transferTo(toSortOutputStream);
+ toSortOutputStream.close();
+ } catch (IOException ex) { IO.exception(ex); }
+ }
+
// From sort, buffered.
InputStream input = IO.ensureBuffered(fromSortInputStream);
// This thread - run builder.
diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/xloader/ProcIngestDataX.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/xloader/ProcIngestDataX.java
index 78a144f..9235e7f 100644
--- a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/xloader/ProcIngestDataX.java
+++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/xloader/ProcIngestDataX.java
@@ -18,13 +18,18 @@
package org.apache.jena.tdb2.xloader;
+import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
import org.apache.jena.atlas.io.IO;
+import org.apache.jena.atlas.json.JSON;
+import org.apache.jena.atlas.json.JsonObject;
import org.apache.jena.atlas.lib.BitsLong;
import org.apache.jena.atlas.lib.DateTimeUtils;
+import org.apache.jena.atlas.lib.Pair;
import org.apache.jena.atlas.lib.Timer;
+import org.apache.jena.atlas.logging.FmtLog;
import org.apache.jena.dboe.base.file.Location;
import org.apache.jena.dboe.sys.Names;
import org.apache.jena.graph.Node;
@@ -70,23 +75,41 @@
ProgressMonitor monitor = ProgressMonitorOutput.create(LOG, "Data", BulkLoaderX.DataTick, BulkLoaderX.DataSuperTick);
// WriteRows does it's own buffering and has direct write-to-buffer.
// Do not buffer here.
+ // Adds gzip processing.
OutputStream outputTriples = IO.openOutputFile(loaderFiles.triplesFile);
OutputStream outputQuads = IO.openOutputFile(loaderFiles.quadsFile);
- dsg.executeWrite(()->build(dsg, monitor,
- outputTriples, outputQuads,
- datafiles));
+ OutputStream outT = outputTriples;
+ OutputStream outQ = outputQuads;
+ dsg.executeWrite(() -> {
+ Pair<Long, Long> p = build(dsg, monitor, outT, outQ, datafiles);
+ String str = DateTimeUtils.nowAsXSDDateTimeString();
+ long cTriple = p.getLeft();
+ long cQuad = p.getRight();
+ FmtLog.info(LOG, "Triples = %,d ; Quads = %,d", cTriple, cQuad);
+ JsonObject obj = JSON.buildObject(b->{
+ b.pair("ingested", str);
+ b.key("data").startArray();
+ datafiles.forEach(fn->b.value(fn));
+ b.finishArray();
+ b.pair("triples", cTriple);
+ b.pair("quads", cQuad);
+ });
+ try ( OutputStream out = IO.openOutputFile(loaderFiles.loadInfo) ) {
+ JSON.write(out, obj);
+ } catch (IOException ex) { IO.exception(ex); }
+ });
TDBInternal.expel(dsg);
SystemIRIx.setProvider(provider);
}
- private static void build(DatasetGraph dsg, ProgressMonitor monitor,
+ private static Pair<Long, Long> build(DatasetGraph dsg, ProgressMonitor monitor,
OutputStream outputTriples, OutputStream outputQuads,
List<String> datafiles) {
DatasetGraphTDB dsgtdb = TDBInternal.getDatasetGraphTDB(dsg);
outputTriples = IO.ensureBuffered(outputTriples);
outputQuads = IO.ensureBuffered(outputQuads);
- NodeTableBuilder sink = new NodeTableBuilder(dsgtdb, monitor, outputTriples, outputQuads, false);
+ IngestData sink = new IngestData(dsgtdb, monitor, outputTriples, outputQuads, false);
Timer timer = new Timer();
timer.startTimer();
// [BULK] XXX Start monitor on first item from parser.
@@ -102,6 +125,9 @@
IO.close(outputTriples);
IO.close(outputQuads);
+ long cTriple = sink.tripleCount();
+ long cQuad = sink.quadCount();
+
// ---- Stats
// See Stats class.
@@ -120,21 +146,22 @@
String str = String.format("Total: %,d tuples : %,.2f seconds : %,.2f tuples/sec [%s]",
total, elapsedSecs, rate, DateTimeUtils.nowAsString());
LOG.info(str);
+ return Pair.create(cTriple, cQuad);
}
- static class NodeTableBuilder implements StreamRDF
- {
+ static class IngestData implements StreamRDF {
private DatasetGraphTDB dsg;
private NodeTable nodeTable;
+ long countTriples = 0;
+ long countQuads = 0;
private WriteRows writerTriples;
private WriteRows writerQuads;
private ProgressMonitor monitor;
private StatsCollectorNodeId stats;
- NodeTableBuilder(DatasetGraphTDB dsg, ProgressMonitor monitor,
- OutputStream outputTriples, OutputStream outputQuads,
- boolean collectStats)
- {
+ IngestData(DatasetGraphTDB dsg, ProgressMonitor monitor,
+ OutputStream outputTriples, OutputStream outputQuads,
+ boolean collectStats) {
this.dsg = dsg;
this.monitor = monitor;
NodeTupleTable ntt = dsg.getTripleTable().getNodeTupleTable();
@@ -145,62 +172,58 @@
this.stats = new StatsCollectorNodeId(nodeTable);
}
- //@Override
- public void startBulk()
- {}
+ // @Override
+ public void startBulk() {}
@Override
- public void start()
- {}
+ public void start() {}
@Override
- public void finish()
- {}
+ public void finish() {}
- //@Override
- public void finishBulk()
- {
+ // @Override
+ public void finishBulk() {
writerTriples.flush();
writerQuads.flush();
nodeTable.sync();
- //dsg.getStoragePrefixes().sync();
+ // dsg.getStoragePrefixes().sync();
}
@Override
- public void triple(Triple triple)
- {
+ public void triple(Triple triple) {
+ countTriples++;
Node s = triple.getSubject();
Node p = triple.getPredicate();
Node o = triple.getObject();
- process(Quad.tripleInQuad,s,p,o);
+ process(Quad.tripleInQuad, s, p, o);
}
@Override
- public void quad(Quad quad)
- {
+ public void quad(Quad quad) {
+ countQuads++;
Node s = quad.getSubject();
Node p = quad.getPredicate();
Node o = quad.getObject();
Node g = null;
// Union graph?!
- if ( ! quad.isTriple() && ! quad.isDefaultGraph() )
+ if ( !quad.isTriple() && !quad.isDefaultGraph() )
g = quad.getGraph();
- process(g,s,p,o);
+ process(g, s, p, o);
}
// --> From NodeIdFactory
private static long encode(NodeId nodeId) {
long x = nodeId.getPtrLocation(); // Should be "getValue"
- switch(nodeId.type()) {
- case PTR:
+ switch (nodeId.type()) {
+ case PTR :
return x;
- case XSD_DOUBLE:
+ case XSD_DOUBLE :
// XSD_DOUBLE is special.
// Set value bit (63) and bit 62
x = DoubleNode62.insertType(x);
return x;
- default:
+ default :
// Bit 62 is zero - tag is for doubles.
x = BitsLong.pack(x, nodeId.getTypeValue(), 56, 62);
// Set the high, value bit.
@@ -239,11 +262,16 @@
monitor.tick();
}
- public StatsCollectorNodeId getCollector() { return stats; }
+ public StatsCollectorNodeId getCollector() {
+ return stats;
+ }
+
+ public long tripleCount() { return countTriples; }
+
+ public long quadCount() { return countQuads; }
@Override
- public void base(String base)
- {}
+ public void base(String base) {}
@Override
public void prefix(String prefix, String iri) {
diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/xloader/ProcNodeTableBuilderX.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/xloader/ProcNodeTableBuilderX.java
index 8bbe2b9..e13f045 100644
--- a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/xloader/ProcNodeTableBuilderX.java
+++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/xloader/ProcNodeTableBuilderX.java
@@ -24,6 +24,8 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
@@ -54,12 +56,12 @@
import org.apache.jena.riot.thrift.wire.RDF_Term;
import org.apache.jena.sparql.core.DatasetGraph;
import org.apache.jena.sparql.core.Quad;
-import org.apache.jena.tdb2.DatabaseMgr;
-import org.apache.jena.tdb2.TDBException;
-import org.apache.jena.tdb2.lib.NodeLib;
import org.apache.jena.system.progress.ProgressIterator;
import org.apache.jena.system.progress.ProgressMonitorOutput;
import org.apache.jena.system.progress.ProgressStreamRDF;
+import org.apache.jena.tdb2.DatabaseMgr;
+import org.apache.jena.tdb2.TDBException;
+import org.apache.jena.tdb2.lib.NodeLib;
import org.apache.jena.tdb2.store.DatasetGraphTDB;
import org.apache.jena.tdb2.store.Hash;
import org.apache.jena.tdb2.store.NodeId;
@@ -79,10 +81,13 @@
public class ProcNodeTableBuilderX {
/** Pair<triples, indexed nodes> */
- // [BULK} Output, not return.
- public static Pair<Long, Long> exec(Logger LOG, String DB, XLoaderFiles loaderFiles, List<String> datafiles, String sortArgs) {
+ // [BULK] Output, not return.
+ public static Pair<Long, Long> exec(Logger LOG1, Logger LOG2, String DB, XLoaderFiles loaderFiles, List<String> datafiles, String sortNodeTableArgs) {
//Threads - 1 parser, 1 builder, 2 sort.
- // Excludes counting inlines.
+ // Steps:
+ // 1 - parser to and pipe terms to sort
+ // 2 - sort
+ // 3 - build node table from unique sort
IRIProvider provider = SystemIRIx.getProvider();
//SystemIRIx.setProvider(new IRIProviderAny());
@@ -99,22 +104,27 @@
Process proc2;
try {
//LOG.info("Step : external sort");
- String[] sortCmd =
- { "sort",
+ List<String> sortCmdBasics = Arrays.asList(
+ "sort",
"--temporary-directory="+loaderFiles.TMPDIR, "--buffer-size=50%",
"--parallel=2", "--unique",
- // Don't compress. There will be enough space because later we work on indexes.
//"--compress-program=/usr/bin/gzip",
"--key=1,1"
- };
- if ( sortArgs != null ) {}
+ );
+ List<String> sortCmd = new ArrayList<>(sortCmdBasics);
+
+ //if ( sortNodeTableArgs != null ) {}
+
+ // See javadoc for CompressSortNodeTableFiles - usually false
+ if ( BulkLoaderX.CompressSortNodeTableFiles )
+ sortCmd.add("--compress-program=/usr/bin/gzip");
proc2 = new ProcessBuilder(sortCmd).start();
+
// To process.
toSortOutputStream = proc2.getOutputStream(); // Needs buffering
// From process
fromSortInputStream = proc2.getInputStream(); // Needs buffering
-
// // Debug sort process.
// InputStream fromSortErrortStream = proc2.getErrorStream();
// IOUtils.copy(fromSortErrortStream, System.err);
@@ -132,7 +142,7 @@
//LOG.info("Step : parse & send to external sort");
Runnable task1 = ()->{
- ProgressMonitorOutput monitor = ProgressMonitorOutput.create(LOG, "Nodes", tickPoint, superTick);
+ ProgressMonitorOutput monitor = ProgressMonitorOutput.create(LOG1, "Nodes", tickPoint, superTick);
OutputStream output = IO.ensureBuffered(toSortOutputStream);
// Counting.
StreamRDF worker = new NodeHashTmpStream(output);
@@ -160,14 +170,14 @@
double xSec = x/1000.0;
double rate = count/xSec;
- FmtLog.info(LOG, "== Parse: %s seconds : %,d triples/quads %,.0f TPS", Timer.timeStr(x), count, rate);
+ FmtLog.info(LOG1, "== Parse: %s seconds : %,d triples/quads %,.0f TPS", Timer.timeStr(x), count, rate);
};
// [BULK] XXX AsyncParser.asyncParse(files, output)
Thread thread1 = async(task1, "AsyncParser");
- //LOG.info("Step : read external sort, build node-data file, build index");
- Runnable task2 = ()->{
+ // Step3: build node table.
+ Runnable task3 = ()->{
Timer timer = new Timer();
// Don't start timer until sort send something
InputStream input = IO.ensureBuffered(fromSortInputStream);
@@ -176,14 +186,12 @@
BufferChannel blkState = FileFactory.createBufferChannel(fileSet, Names.extBptState);
long idxTickPoint = BulkLoaderX.DataTick;
int idxSuperTick = BulkLoaderX.DataSuperTick;
- ProgressMonitorOutput monitor = ProgressMonitorOutput.create(LOG, "Index", idxTickPoint, idxSuperTick);
-
- // SLOW
+ ProgressMonitorOutput monitor = ProgressMonitorOutput.create(LOG2, "Index", idxTickPoint, idxSuperTick);
// Library of tools!
dsg.executeWrite(()->{
BinaryDataFile objectFile = nodeTable.getData();
- Iterator<Record> rIter = records(LOG, input, objectFile);
+ Iterator<Record> rIter = records(LOG2, input, objectFile);
rIter = new ProgressIterator<>(rIter, monitor);
// Record of (hash, nodeId)
BPlusTree bpt1 = (BPlusTree)(nodeTable.getIndex());
@@ -210,23 +218,23 @@
long count = monitor.getTicks();
countIndexedNodes.set(count);
String rateStr = BulkLoaderX.rateStr(count, x);
- FmtLog.info(LOG, "== Index: %s seconds : %,d indexed RDF terms : %s PerSecond", Timer.timeStr(x), count, rateStr);
+ FmtLog.info(LOG2, "== Index: %s seconds : %,d indexed RDF terms : %s PerSecond", Timer.timeStr(x), count, rateStr);
};
- Thread thread2 = async(task2, "AsyncBuild");
+ Thread thread3 = async(task3, "AsyncBuild");
try {
int x = proc2.waitFor();
if ( x != 0 )
- FmtLog.error(LOG, "Sort RC = %d", x);
+ FmtLog.error(LOG2, "Sort RC = %d", x);
else
- LOG.info("Sort finished");
+ LOG2.info("Sort finished");
} catch (InterruptedException e) {
e.printStackTrace();
}
BulkLoaderX.waitFor(thread1);
- BulkLoaderX.waitFor(thread2);
+ BulkLoaderX.waitFor(thread3);
return Pair.create(countParseTicks.get(), countIndexedNodes.get());
}
@@ -336,7 +344,7 @@
static class NodeHashTmpStream implements StreamRDF {
private final OutputStream outputData;
- private CacheSet<Node> cache = CacheFactory.createCacheSet(100_000);
+ private CacheSet<Node> cache = CacheFactory.createCacheSet(500_000);
NodeHashTmpStream(OutputStream outputFile) {
this.outputData = outputFile;
@@ -378,7 +386,6 @@
return;
cache.add(node);
// -- Hash of node
- //SystemTDB.LenNodeHash, SystemTDB.SizeOfNodeId
NodeLib.setHash(hash, node);
try {
byte k[] = hash.getBytes();
@@ -388,7 +395,6 @@
outputData.write(' ');
write(outputData, tBytes);
outputData.write('\n');
-
} catch (TException e) {
e.printStackTrace();
} catch (IOException e) {
@@ -411,6 +417,5 @@
public void finish() {
IO.flush(outputData);
}
-
}
}
diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/xloader/XLoaderFiles.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/xloader/XLoaderFiles.java
index a14c19d..4d57578 100644
--- a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/xloader/XLoaderFiles.java
+++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/xloader/XLoaderFiles.java
@@ -26,7 +26,7 @@
// Fixed file names in temporary directory
static final String nameTriplesFile = "triples.tmp";
static final String nameQuadsFile = "quads.tmp";
- static final String nameLoadInfo = "load.txt";
+ static final String nameLoadInfo = "load.json";
// Names.
public final String TMPDIR;
@@ -34,10 +34,12 @@
public final String quadsFile;
public final String loadInfo;
public XLoaderFiles(String TMPDIR) {
+ String ext = BulkLoaderX.CompressDataFiles ? ".gz" : "";
+
this.TMPDIR = Objects.requireNonNull(TMPDIR);
Path loc = Path.of(TMPDIR);
- triplesFile = loc.resolve(nameTriplesFile).toString();
- quadsFile = loc.resolve(nameQuadsFile).toString();
+ triplesFile = loc.resolve(nameTriplesFile).toString()+ext;
+ quadsFile = loc.resolve(nameQuadsFile).toString()+ext;
loadInfo = loc.resolve(nameLoadInfo).toString();
}
}
\ No newline at end of file
diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/xloader/package-info.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/xloader/package-info.java
new file mode 100644
index 0000000..3f9c1eb
--- /dev/null
+++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/xloader/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.tdb2.xloader;
+