blob: 20ec9d94665e1cafd2d30793db4a0ba3b54b5b3e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.jena.riot.system;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.jena.atlas.iterator.IteratorCloseable;
import org.apache.jena.atlas.iterator.IteratorSlotted;
import org.apache.jena.atlas.lib.InternalErrorException;
import org.apache.jena.atlas.logging.FmtLog;
import org.apache.jena.graph.Node;
import org.apache.jena.graph.Triple;
import org.apache.jena.riot.Lang;
import org.apache.jena.riot.RDFParser;
import org.apache.jena.riot.RDFParserBuilder;
import org.apache.jena.riot.RiotException;
import org.apache.jena.riot.SysRIOT;
import org.apache.jena.riot.lang.RiotParsers;
import org.apache.jena.sparql.core.Quad;
import org.apache.jena.util.iterator.ClosableIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Async parsing - parses a list of files or URLs on a separate thread.
* <p>
* The parser runs on a separate thread. It's output is sent to a StreamRDF on the
* callers thread. The parser output is batched into blocks and placed on a
* finite-sized concurrent {@link BlockingQueue}.
* <p>
* There are overheads, so this is only beneficial in some situations. Delivery to
* the StreamRDF has an initial latency while the first batch of work is accumulated.
* Using the {@link AsyncParserBuilder} gives control over the chunk size such that initial latency
* can be reduced at the cost of possibly decreasing the overall throughput.
* <p>
* Closing the returned {@link ClosableIterator}s terminates the parsing
* process and closes the involved resources.
*/
public class AsyncParser {
static final Logger LOG = LoggerFactory.getLogger(AsyncParser.class);
static final ErrorHandler dftErrorHandler = createDefaultErrorhandler(LOG);
static final int dftChunkSize = 100_000;
// There is no point letting the parser get a long way ahead.
static final int dftQueueSize = 10;
static final List<EltStreamRDF> END = List.of();
private static final StreamRDF alwaysFailingStreamRdf = new StreamToElements(elt -> { throw new RuntimeException(new InterruptedException()); });
/* Destination states for handling concurrent abort */
/** Normal operation: Accept incoming data */
private static final int RUNNING = 0;
/** Abort requested: The next chunk will trigger placing the poison on the queue and transition to STOPPED state */
private static final int ABORTING = 1;
/** Stopped (regardless whether due to normal or exceptional cause): Any further incoming data raises an exception */
private static final int STOPPED = 2;
private AsyncParser() {}
/**
* Function that reads a file or GETs a URL, parses the content on a separate
* thread and sends the output to a StreamRDF on the callers thread.
*/
public static void asyncParse(String fileOrURL, StreamRDF output) {
AsyncParser.of(fileOrURL).asyncParseSources(output);
}
/**
* Function that parses content from a list of files or URLs on a separate thread
* and sends the output to a StreamRDF on the callers thread.
*/
public static void asyncParse(List<String> filesOrURLs, StreamRDF output) {
AsyncParser.ofLocations(filesOrURLs).asyncParseSources(output);
}
/**
* Function to that parses an {@link InputStream} on a separate thread
* and sends the output to a StreamRDF on the callers thread.
*/
public static void asyncParse(InputStream input, Lang lang, String baseURI, StreamRDF output) {
AsyncParser.of(input, lang, baseURI).asyncParseSources(output);
}
/**
* Parse a number of sources ({@link RDFParserBuilder RDFParserBuilders}) on a separate thread and
* send the output to a StreamRDF on the callers thread.
*/
public static void asyncParseSources(List<RDFParserBuilder> sources, StreamRDF output) {
AsyncParser.ofSources(sources).asyncParseSources(output);
}
/** Pull parser - triples */
public static IteratorCloseable<Triple> asyncParseTriples(String fileOrURL) {
return AsyncParser.of(fileOrURL).asyncParseTriples();
}
/** Pull parser - triples */
public static IteratorCloseable<Triple> asyncParseTriples(List<String> filesOrURLs) {
return AsyncParser.ofLocations(filesOrURLs).asyncParseTriples();
}
/**
* Pull parser - triples.
* <p>
* See also {@link RiotParsers#createIteratorNTriples}.
*/
public static IteratorCloseable<Triple> asyncParseTriples(InputStream input, Lang lang, String baseURI) {
return AsyncParser.of(input, lang, baseURI).asyncParseTriples();
}
/** Pull parser - quads */
public static IteratorCloseable<Quad> asyncParseQuads(String fileOrURL) {
return AsyncParser.of(fileOrURL).asyncParseQuads();
}
/** Pull parser - quads */
public static IteratorCloseable<Quad> asyncParseQuads(List<String> filesOrURLs) {
return AsyncParser.ofLocations(filesOrURLs).asyncParseQuads();
}
/**
* Pull parser - quads.
* <p>
* See also {@link RiotParsers#createIteratorNQuads}.
*/
public static IteratorCloseable<Quad> asyncParseQuads(InputStream input, Lang lang, String baseURI) {
return AsyncParser.of(input, lang, baseURI).asyncParseQuads();
}
/** Create an {@link AsyncParserBuilder} from a file or URL. */
public static AsyncParserBuilder of(String fileOrURL) {
Objects.requireNonNull(fileOrURL);
return ofLocations(List.of(fileOrURL));
}
/** Create an {@link AsyncParserBuilder} from a set of files and/or URLs. */
public static AsyncParserBuilder ofLocations(List<String> filesOrURLs) {
Objects.requireNonNull(filesOrURLs);
LOG.debug("Parse: "+filesOrURLs);
return new AsyncParserBuilder(urlsToSource(filesOrURLs));
}
/** Create an {@link AsyncParserBuilder} from an input stream. */
public static AsyncParserBuilder of(InputStream input, Lang lang, String baseURI) {
Objects.requireNonNull(input);
Objects.requireNonNull(lang);
return ofSources(inputStreamToSource(input, lang, baseURI));
}
/** Create an {@link AsyncParserBuilder} from an {@link RDFParserBuilder}. */
public static AsyncParserBuilder of(RDFParserBuilder source) {
Objects.requireNonNull(source);
return ofSources(Arrays.asList(source));
}
/** Create an {@link AsyncParserBuilder} from a set of {@link RDFParserBuilder} instances. */
public static AsyncParserBuilder ofSources(List<RDFParserBuilder> sources) {
Objects.requireNonNull(sources);
return new AsyncParserBuilder(sources);
}
/*
* Internals
*/
private static List<RDFParserBuilder> urlsToSource(List<String> filesOrURLs) {
return filesOrURLs.stream().map(
uriOrFile -> RDFParser.source(uriOrFile).errorHandler(dftErrorHandler))
.collect(Collectors.toList());
}
/** Create a source object from the given arguments that is suitable for use with
* {@link #asyncParseIterator(List)} which return*/
private static List<RDFParserBuilder> inputStreamToSource(InputStream input, Lang lang, String baseURI) {
return List.of(RDFParser.source(input).lang(lang).base(baseURI).errorHandler(dftErrorHandler));
}
/**
* This method raises a caller-thread-side exception for a throwable returned by the parser thread.
* If that throwable is a RuntimeException then the caller side's stacktrace is added
* as a suppressed exception in order to retain the original exception type.
* Any other type of throwable raises a generic RuntimeException with it as the cause.
*/
private static void raiseException(Throwable throwable) {
if (throwable instanceof RuntimeException) {
RuntimeException e = (RuntimeException)throwable;
e.addSuppressed(new RuntimeException("Encountered error element from parse thread"));
throw e;
}
throw new RuntimeException("Encountered error element from parse thread", throwable);
}
static Function<EltStreamRDF, Triple> elt2Triple = x-> {
if ( x.isException() ) {
raiseException(x.exception());
}
if ( x.isQuad() ) {
Quad quad = x.quad();
Node g = quad.getGraph();
if ( g == Quad.tripleInQuad || Quad.isDefaultGraph(g) )
return quad.asTriple();
return null;
}
return x.triple();
};
static Function<EltStreamRDF, Quad> elt2Quad = x-> {
if ( x.isException() ) {
raiseException(x.exception());
}
if ( x.isTriple() )
return Quad.create(Quad.defaultGraphIRI, x.triple());
return x.quad();
};
// Iterator that stops when an end marker is seen.
static <X> IteratorCloseable<X> blockingIterator(Runnable closeAction, BlockingQueue<X> queue, Predicate<X> endTest) {
return new IteratorSlotted<>() {
boolean ended = false;
@Override
protected X moveToNext() {
try {
X x = null;
if (!ended) {
x = queue.take();
}
if ( endTest.test(x) ) {
ended = true;
return null;
}
return x;
} catch (InterruptedException e) {
ended = true;
return null;
}
}
@Override
protected boolean hasMore() {
return !ended;
}
@Override
protected void closeIterator() {
closeAction.run();
}
};
}
static ErrorHandler createDefaultErrorhandler(Logger LOG1) {
return new ErrorHandler() {
@Override
public void warning(String message, long line, long col)
{ LOG1.warn(SysRIOT.fmtMessage(message, line, col)); }
@Override
public void error(String message, long line, long col) {
throw new RiotException(SysRIOT.fmtMessage(message, line, col)) ;
}
@Override
public void fatal(String message, long line, long col) {
throw new RiotException(SysRIOT.fmtMessage(message, line, col)) ;
}
};
}
/** This class's purpose is to run a set of RDF parsers and place their generated elements on a queue.
* Thereby errors and abort requests are handled. */
private static class Task implements Runnable {
private Logger logger;
private List<RDFParserBuilder> sources;
private BlockingQueue<List<EltStreamRDF>> queue;
private int chunkSize;
private Predicate<EltStreamRDF> prematureDispatch;
// Destination resources are initialized upon calling run();
private EltStreamBatcher<EltStreamRDF> batcher;
private StreamRDF generatorStreamRdf;
private AtomicInteger destinationState = new AtomicInteger(RUNNING);
private boolean errorEncountered = false;
public Task(List<RDFParserBuilder> sources, BlockingQueue<List<EltStreamRDF>> queue, int chunkSize, Predicate<EltStreamRDF> prematureDispatch, Logger logger) {
this.sources = sources;
this.queue = queue;
this.chunkSize = chunkSize;
this.prematureDispatch = prematureDispatch;
this.logger = logger;
}
@Override
public void run() {
initDestination();
try {
start();
int n = sources.size();
for (int i = 0; i < n; ++i) {
RDFParserBuilder parser = sources.get(i);
parse(parser);
}
} finally {
finish();
}
if (logger.isDebugEnabled()) {
logger.debug("Finish parsing");
}
}
/** Sets the destination into ABORTING state (if it was running).
* This triggers transition into ABORT state on the next chunk. */
public void abort() {
destinationState.compareAndSet(RUNNING, ABORTING);
}
private void initDestination() {
// -- Parser thread setup
Consumer<List<EltStreamRDF>> destination = batch -> {
if (destinationState.get() == RUNNING) {
try {
queue.put(batch);
} catch (InterruptedException ex) {
// After interrupt we may be in ABORTING state - therefore
// check whether to transition into ABORT state
handleAbortingState();
// FmtLog.error(LOG, ex, "Error: %s", ex.getMessage());
throw new RuntimeException(ex);
}
} else {
handleAbortingState();
throw new RuntimeException(new InterruptedException());
}
};
this.batcher = new EltStreamBatcher<>(destination, END, chunkSize);
Consumer<EltStreamRDF> eltSink = batcher;
// Take care of a custom dispatch policy
if (prematureDispatch != null) {
eltSink = elt -> {
boolean dispatchImmediately = prematureDispatch.test(elt);
batcher.accept(elt);
if (dispatchImmediately) {
batcher.flush();
}
};
}
this.generatorStreamRdf = new StreamToElements(eltSink);
}
private void start() {
batcher.startBatching();
if (logger.isDebugEnabled()) {
logger.debug("Start parsing");
}
}
private void parse(RDFParserBuilder parser) {
try {
// If an error occured then all parser are invoked anyway because any
// resources they own need yet to be closed.
// At this point, however, the parser's sink will always fail and
// any further errors will be suppressed.
StreamRDF sink = errorEncountered
? alwaysFailingStreamRdf
: generatorStreamRdf;
parser.parse(sink);
} catch (RuntimeException ex) {
Throwable cause = ex.getCause();
if (errorEncountered) {
if (logger.isDebugEnabled()) {
logger.debug("Suppressed exception", ex);
}
} else {
if (cause instanceof InterruptedException) {
if (logger.isDebugEnabled()) {
logger.debug("Parsing was interrupted");
}
} else {
// Parse error.
EltStreamRDF elt = EltStreamRDF.exception(ex);
batcher.accept(elt);
}
errorEncountered = true;
}
} catch (Throwable throwable) {
if (errorEncountered) {
if (logger.isDebugEnabled()) {
logger.debug("Suppressed exception", throwable);
}
} else {
// Very bad! Wrapping with runtime exception to track this location
EltStreamRDF elt = EltStreamRDF.exception(new RuntimeException(throwable));
batcher.accept(elt);
errorEncountered = true;
}
}
}
private void finish() {
try {
// If we are already in ABORTED state then finishBatching will do nothing but raise
// an exception
// If we are in ABORTING state we still need to dispatch another batch in order to transition into ABORT state
if (destinationState.get() != STOPPED) {
batcher.finishBatching();
}
} catch(Throwable ex) {
if (errorEncountered) {
if (logger.isDebugEnabled()) {
logger.debug("Suppressed exception", ex);
}
} else {
Throwable cause = ex.getCause();
if (cause instanceof InterruptedException) {
if (logger.isDebugEnabled()) {
logger.debug("Parsing was interrupted");
}
} else {
// Rethrow if something else went wrong
throw new RuntimeException(ex);
}
}
} finally {
// We are done; dispatching any further chunks is now an error and raises an exception
destinationState.set(STOPPED);
}
}
/** Check */
private void handleAbortingState() {
if (destinationState.compareAndSet(ABORTING, STOPPED)) {
// If we are transitioning into ABORT state then force putting
// the poison on the queue - regardless of further interruptions!
while (true) {
try {
queue.clear();
queue.put(END);
break;
} catch (InterruptedException e) {
continue;
}
}
}
}
}
/** Returns a runnable for stopping the parse process */
static Runnable startParserThread(
Logger logger, List<RDFParserBuilder> sources,
BlockingQueue<List<EltStreamRDF>> queue,
int chunkSize,
Predicate<EltStreamRDF> prematureDispatch,
boolean daemonMode) {
Task task = new Task(sources, queue, chunkSize, prematureDispatch, logger);
// Ensures runnable has started.
// ThreadLib.async(task) does not ensure runnable has started.
Thread parserThread = new Thread(task, "AsyncParser");
parserThread.setDaemon(daemonMode);
parserThread.start();
return () -> {
task.abort();
// Interrupt the parsing thread
// Note that InputStreams and ByteChannels may close themselves
// when interrupted while reading
parserThread.interrupt();
try {
// Wait for the thread to terminate so that all is clean when we return from close()
// Note: Some AsyncParser unit tests assert that the number of threads
// before and after parsing match up (within tolerance) - so joining is essential.
parserThread.join();
} catch (Exception e) {
throw new RuntimeException(e);
}
};
}
private static void dispatch(EltStreamRDF elt, StreamRDF stream) {
switch (elt.getType()) {
case TRIPLE: stream.triple(elt.triple()); break;
case QUAD: stream.quad(elt.quad()); break;
case PREFIX: stream.prefix(elt.prefix(), elt.iri()); break;
case BASE: stream.base(elt.iri()); break;
case EXCEPTION: raiseException(elt.exception()); break;
default: throw new InternalErrorException("Bad EltStreamRDF");
}
}
/** Receiver. Take chunks off the queue and send to the output StreamRDF. */
static void receiver(Runnable closeAction, Logger LOG2, BlockingQueue<List<EltStreamRDF>> queue, StreamRDF output) {
// -- Receiver thread
int count = 0;
// Receive.
try {
for(;;) {
List<EltStreamRDF> batch = queue.take();
if ( batch == END ) {
FmtLog.debug(LOG2, "Receive: END (%,d)", count);
break;
}
count += batch.size();
if ( LOG.isDebugEnabled() )
FmtLog.debug(LOG2, "Receive: Batch : %,d (%,d)", batch.size(), count);
dispatch(batch, output);
}
} catch (InterruptedException e) {
FmtLog.error(LOG2, e, "Interrupted");
} finally {
// The close action not only stops the parser but also waits
// for its thread to finish
closeAction.run();
}
}
private static void dispatch(List<EltStreamRDF> batch, StreamRDF stream) {
for ( EltStreamRDF elt : batch )
dispatch(elt, stream);
}
/** Convert a Stream into EltStreamRDF */
private static class StreamToElements implements StreamRDF {
private final Consumer<EltStreamRDF> destination;
public StreamToElements(Consumer<EltStreamRDF> destination) {
this.destination = destination;
}
@Override
public void start() { /* nothing to do */ }
@Override
public void finish() { /* nothing to do */ }
@Override
public void triple(Triple triple) {
EltStreamRDF elt = EltStreamRDF.triple(triple);
deliver(elt);
}
@Override
public void quad(Quad quad) {
EltStreamRDF elt = EltStreamRDF.quad(quad);
deliver(elt);
}
@Override
public void base(String base) {
EltStreamRDF elt = EltStreamRDF.base(base);
deliver(elt);
}
@Override
public void prefix(String prefix, String iri) {
EltStreamRDF elt = EltStreamRDF.prefix(prefix, iri);
deliver(elt);
}
private void deliver(EltStreamRDF elt) {
destination.accept(elt);
}
}
/** Batch items from a StreamRDF */
private static class EltStreamBatcher<T> implements Consumer<T> {
private final int batchSize;
private List<T> elements = null;
private final Consumer<List<T>> batchDestination;
private final List<T> endMarker;
public EltStreamBatcher(Consumer<List<T>> batchDestination, List<T> endMarker, int batchSize) {
this.batchDestination = batchDestination;
this.batchSize = batchSize;
this.endMarker = endMarker;
}
public void startBatching() { /* nothing to do */ }
/** Immediately dispatch any available data. */
public void flush() {
if (elements != null) {
dispatch(elements);
elements = null;
}
}
public void finishBatching() {
try {
flush();
} finally {
dispatch(endMarker);
}
}
private <X> boolean isEmpty(List<X> list) {
return list == null || list.isEmpty();
}
/** The added element is included in the dispatched batch when triggered */
@Override
public void accept(T elt) {
if ( elements == null )
elements = allocChunk();
elements.add(elt);
maybeDispatch();
}
private void maybeDispatch() {
long x = elements.size();
if ( x < batchSize )
return;
dispatch(elements);
elements = null;
}
private int count = 0 ;
private void dispatch(List<T> batch) {
count += batch.size();
batchDestination.accept(batch);
}
private List<T> allocChunk() {
return new ArrayList<>(batchSize);
}
}
}