blob: e684e68cd8519656d39d78a0c4b94119cd3cc81c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.jena.tdb2.loader.main;
import static org.apache.jena.tdb2.loader.main.PhasedOps.acquire;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Semaphore;
import org.apache.jena.atlas.lib.ArrayUtils;
import org.apache.jena.atlas.lib.tuple.Tuple;
import org.apache.jena.atlas.logging.Log;
import org.apache.jena.dboe.transaction.txn.Transaction;
import org.apache.jena.dboe.transaction.txn.TransactionCoordinator;
import org.apache.jena.query.TxnType;
import org.apache.jena.tdb2.loader.base.BulkStartFinish;
import org.apache.jena.tdb2.loader.base.CoLib;
import org.apache.jena.tdb2.loader.base.MonitorOutput;
import org.apache.jena.tdb2.store.NodeId;
import org.apache.jena.tdb2.store.tupletable.TupleIndex;
/**
* Build index(es).
* Provides a function {@link #index()} that should be called from another thread
* to deliver chunks of tuples ({@code List<Tuple<NodeId>>}).
* Each chunk should be the same Tuyple length and this must correspond to the length of the {@link TupleIndex}s being loaded.
* <p>
* This class creates one thread per {@link TupleIndex}.
*/
public class Indexer implements BulkStartFinish {
private BlockingQueue<List<Tuple<NodeId>>>[] pipesTripleIndexers;
private final int N;
private final MonitorOutput output;
private TupleIndex[] indexes;
private final Semaphore termination = new Semaphore(0);
@SuppressWarnings("unchecked")
public Indexer(MonitorOutput output, TupleIndex... idxTriples) {
pipesTripleIndexers = ArrayUtils.alloc(BlockingQueue.class, idxTriples.length);
this.N = idxTriples.length;
this.indexes = Arrays.copyOf(idxTriples, N);
this.output = output;
for ( int i = 0; i < N ; i++ ) {
pipesTripleIndexers[i] = new ArrayBlockingQueue<List<Tuple<NodeId>>>(LoaderConst.QueueSizeTuples);
}
}
/** Return a function that delivers multiple {@code List<Tuple<NodeId>>>} to this indexer */
public Destination<Tuple<NodeId>> index() {
return this::index;
}
private void index(List<Tuple<NodeId>> chunk) {
for ( int i = 0; i < N ; i++ ) {
try {
pipesTripleIndexers[i].put(chunk);
}
catch (InterruptedException e) {
String name = indexes[i].getName();
Log.error(this, "Interrupted: "+name, e);
throw new RuntimeException(e);
}
}
}
/** Start the threads that will do the indexing */
@Override
public void startBulk() {
for ( int i = 0; i < N ; i++ ) {
TupleIndex idx = indexes[i];
BlockingQueue<List<Tuple<NodeId>>> pipe = pipesTripleIndexers[i];
new Thread(()->stageIndex(pipe, idx)).start();
}
}
/** Wait for all the indexing threads to complete. */
@Override
public void finishBulk() {
//output.print("Wait for %d indexers", N);
acquire(termination, N);
}
private void stageIndex(BlockingQueue<List<Tuple<NodeId>>> pipe, TupleIndex idx) {
TransactionCoordinator coordinator = CoLib.newCoordinator();
CoLib.add(coordinator, idx);
CoLib.start(coordinator);
Transaction transaction = coordinator.begin(TxnType.WRITE);
boolean workHasBeenDone;
try {
Destination<Tuple<NodeId>> loader = loadTuples(idx);
for (;;) {
List<Tuple<NodeId>> tuples = pipe.take();
if ( tuples.isEmpty() )
break;
loader.deliver(tuples);
}
workHasBeenDone = ! idx.isEmpty();
transaction.commit();
} catch (Exception ex) {
Log.error(this, "Interrupted", ex);
transaction.abort();
workHasBeenDone = false;
}
CoLib.finish(coordinator);
if ( workHasBeenDone )
output.print("Finish - index %s", idx.getName());
termination.release();
}
private static Destination<Tuple<NodeId>> loadTuples(TupleIndex index) {
return (List<Tuple<NodeId>> tuples) -> {
for(Tuple<NodeId> tuple : tuples)
index.add(tuple);
};
}
}