blob: 27c914e1f5731553d68bb9ebc07b0cf643432fa7 [file] [log] [blame]
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* See the NOTICE file distributed with this work for additional
* information regarding copyright ownership.
*/
package org.apache.jena.tdb2.store;
import java.util.Iterator;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.jena.atlas.lib.InternalErrorException;
import org.apache.jena.atlas.lib.tuple.Tuple;
import org.apache.jena.dboe.storage.StorageRDF;
import org.apache.jena.dboe.transaction.txn.Transaction;
import org.apache.jena.dboe.transaction.txn.TransactionalSystem;
import org.apache.jena.graph.Node;
import org.apache.jena.graph.Triple;
import org.apache.jena.sparql.core.DatasetChanges;
import org.apache.jena.sparql.core.Quad;
import org.apache.jena.sparql.core.QuadAction;
/** {@link StorageRDF} for TDB2 */
public class StorageTDB implements StorageRDF {
// SWITCHING. This could be the switch point, not the DatasetGraph. Probably makes little difference.
private TripleTable tripleTable;
private QuadTable quadTable;
private TransactionalSystem txnSystem;
// SWITCHING.
// In notifyAdd and notifyDelete, check whether the change is a real change or not.
// e.g. Adding a quad already present is not a real change.
// However, that requires looking in the data so incurs a cost.
// Normally, "false". "QuadAction.NO_*" are not used.
private final boolean checkForChange = false;
private boolean closed = false;
public StorageTDB(TransactionalSystem txnSystem, TripleTable tripleTable, QuadTable quadTable) {
this.txnSystem = txnSystem;
this.tripleTable = tripleTable;
this.quadTable = quadTable;
}
public QuadTable getQuadTable() {
checkActive();
return quadTable;
}
public TripleTable getTripleTable() {
checkActive();
return tripleTable;
}
private void checkActive() {}
// Watching changes (add, delete, deleteAny)
private DatasetChanges monitor = null;
public void setMonitor(DatasetChanges changes) {
monitor = changes;
}
public void unsetMonitor(DatasetChanges changes) {
if ( monitor != changes )
throw new InternalErrorException();
monitor = null;
}
private final void notifyAdd(Node g, Node s, Node p, Node o) {
if ( monitor == null )
return;
QuadAction action = QuadAction.ADD;
if ( checkForChange ) {
if ( contains(g, s, p, o) )
action = QuadAction.NO_ADD;
}
monitor.change(action, g, s, p, o);
}
private final void notifyDelete(Node g, Node s, Node p, Node o) {
if ( monitor == null )
return;
QuadAction action = QuadAction.DELETE;
if ( checkForChange ) {
if ( !contains(g, s, p, o) )
action = QuadAction.NO_DELETE;
}
monitor.change(action, g, s, p, o);
}
@Override
public void add(Node s, Node p, Node o) {
checkActive();
ensureWriteTxn();
notifyAdd(null, s, p, o);
getTripleTable().add(s, p, o);
}
@Override
public void add(Node g, Node s, Node p, Node o) {
checkActive();
ensureWriteTxn();
notifyAdd(g, s, p, o);
getQuadTable().add(g, s, p, o);
}
@Override
public void delete(Node s, Node p, Node o) {
checkActive();
ensureWriteTxn();
notifyDelete(null, s, p, o);
getTripleTable().delete(s, p, o);
}
@Override
public void delete(Node g, Node s, Node p, Node o) {
checkActive();
ensureWriteTxn();
notifyDelete(g, s, p, o);
getQuadTable().delete(g, s, p, o);
}
@Override
public void removeAll(Node s, Node p, Node o) {
checkActive();
ensureWriteTxn();
removeWorker(() -> tripleTable.getNodeTupleTable().findAsNodeIds(s,p,o),
x -> tripleTable.getNodeTupleTable().getTupleTable().delete(x) );
}
@Override
public void removeAll(Node g, Node s, Node p, Node o) {
checkActive();
ensureWriteTxn();
removeWorker(() -> quadTable.getNodeTupleTable().findAsNodeIds(g,s,p,o),
x -> quadTable.getNodeTupleTable().getTupleTable().delete(x) );
}
private static final int DeleteBufferSize = 1000;
/** General purpose "remove by pattern" code */
private void removeWorker(Supplier<Iterator<Tuple<NodeId>>> finder, Consumer<Tuple<NodeId>> deleter) {
// Allocate buffer once.
// Not Java11 @SuppressWarnings("unchecked")
//Tuple<NodeId>[] buffer = (Tuple<NodeId>[])new Object[DeleteBufferSize];
Object[] buffer = new Object[DeleteBufferSize];
while (true) {
Iterator<Tuple<NodeId>> iter = finder.get();
// Get a slice
int idx = 0;
for (; idx < DeleteBufferSize; idx++ ) {
if ( !iter.hasNext() )
break;
buffer[idx] = iter.next();
}
// Delete them.
for ( int i = 0; i < idx; i++ ) {
@SuppressWarnings("unchecked")
Tuple<NodeId> x = (Tuple<NodeId>)buffer[i];
deleter.accept(x);
buffer[i] = null;
}
// Finished?
if ( idx < DeleteBufferSize )
break;
}
}
@Override
public Iterator<Quad> find(Node g, Node s, Node p, Node o) {
checkActive();
requireTxn();
return getQuadTable().find(g, s, p, o);
}
@Override
public Iterator<Triple> find(Node s, Node p, Node o) {
checkActive();
requireTxn();
return getTripleTable().find(s, p, o);
}
// @Override
// public Stream<Quad> stream(Node g, Node s, Node p, Node o) {
// checkActive();
// requireTxn();
// return Iter.asStream(getQuadTable().find(g, s, p, o));
// }
//
// @Override
// public Stream<Triple> stream(Node s, Node p, Node o) {
// checkActive();
// requireTxn();
// return Iter.asStream(getTripleTable().find(s, p, o));
// }
@Override
public boolean contains(Node s, Node p, Node o) {
checkActive();
requireTxn();
return getTripleTable().find(s, p, o).hasNext();
}
@Override
public boolean contains(Node g, Node s, Node p, Node o) {
checkActive();
requireTxn();
return getQuadTable().find(g, s, p, o).hasNext();
}
// This test is also done by the transactional components so no need to test here.
private void requireTxn() {}
// private void requireTxn() {
// if ( ! txnSystem.isInTransaction() )
// throw new TransactionException("Not on a transaction");
// }
private void ensureWriteTxn() {
Transaction txn = txnSystem.getThreadTransaction();
txn.ensureWriteTxn();
}
}