blob: b2fca7b9a2e68cb5907eb896d2d7c921808b0dd8 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.jena.riot.lang;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.TimeUnit;
import org.apache.jena.atlas.lib.Closeable;
import org.apache.jena.riot.RiotException;
import org.apache.jena.riot.system.PrefixMap;
import org.apache.jena.riot.system.PrefixMapFactory;
* <p>
* A {@code PipedRDFIterator} should be connected to a {@link PipedRDFStream}
* implementation; the piped iterator then provides whatever RDF primitives are
* written to the {@code PipedRDFStream}
* </p>
* <p>
* Typically, data is read from a {@code PipedRDFIterator} by one thread (the
* consumer) and data is written to the corresponding {@code PipedRDFStream} by
* some other thread (the producer). Attempting to use both objects from a
* single thread is not recommended, as it may deadlock the thread. The
* {@code PipedRDFIterator} contains a buffer, decoupling read operations from
* write operations, within limits.
* </p>
* <p>
* Inspired by Java's {@link} and
* {@link}
* </p>
* @param <T>
* The type of the RDF primitive, should be one of {@code Triple},
* {@code Quad}, or {@code Tuple<Node>}
* @see PipedTriplesStream
* @see PipedQuadsStream
* @see PipedTuplesStream
public class PipedRDFIterator<T> implements Iterator<T>, Closeable {
* Constant for default buffer size
public static final int DEFAULT_BUFFER_SIZE = 10000;
* Constant for default poll timeout in milliseconds, used to stop the
* consumer deadlocking in certain circumstances
public static final int DEFAULT_POLL_TIMEOUT = 1000; // one second
* Constant for max number of failed poll attempts before the producer will
* be declared as dead
public static final int DEFAULT_MAX_POLLS = 10;
private final BlockingQueue<T> queue;
private final T endMarker = (T) new Object();
private volatile boolean closedByConsumer = false;
private volatile boolean closedByProducer = false;
private volatile boolean finished = false;
private volatile boolean threadReused = false;
private volatile Thread consumerThread;
private volatile Thread producerThread;
private boolean connected = false;
private int pollTimeout = DEFAULT_POLL_TIMEOUT;
private int maxPolls = DEFAULT_MAX_POLLS;
private T slot;
private final Object lock = new Object(); // protects baseIri and prefixes
private String baseIri;
private final PrefixMap prefixes = PrefixMapFactory.create();
* Creates a new piped RDF iterator with the default buffer size of
* <p>
* Buffer size must be chosen carefully in order to avoid performance
* problems, if you set the buffer size too low you will experience a lot of
* blocked calls so it will take longer to consume the data from the
* iterator. For best performance the buffer size should be at least 10% of
* the expected input size though you may need to tune this depending on how
* fast your consumer thread is.
* </p>
public PipedRDFIterator() {
* Creates a new piped RDF iterator
* <p>
* Buffer size must be chosen carefully in order to avoid performance
* problems, if you set the buffer size too low you will experience a lot of
* blocked calls so it will take longer to consume the data from the
* iterator. For best performance the buffer size should be roughly 10% of
* the expected input size though you may need to tune this depending on how
* fast your consumer thread is.
* </p>
* @param bufferSize
* Buffer size
public PipedRDFIterator(int bufferSize) {
* Creates a new piped RDF iterator
* <p>
* Buffer size must be chosen carefully in order to avoid performance
* problems, if you set the buffer size too low you will experience a lot of
* blocked calls so it will take longer to consume the data from the
* iterator. For best performance the buffer size should be roughly 10% of
* the expected input size though you may need to tune this depending on how
* fast your consumer thread is.
* </p>
* <p>
* The fair parameter controls whether the locking policy used for the
* buffer is fair. When enabled this reduces throughput but also reduces the
* chance of thread starvation. This likely need only be set to {@code true}
* if there will be multiple consumers.
* </p>
* @param bufferSize
* Buffer size
* @param fair
* Whether the buffer should use a fair locking policy
public PipedRDFIterator(int bufferSize, boolean fair) {
* Creates a new piped RDF iterator
* <p>
* Buffer size must be chosen carefully in order to avoid performance
* problems, if you set the buffer size too low you will experience a lot of
* blocked calls so it will take longer to consume the data from the
* iterator. For best performance the buffer size should be roughly 10% of
* the expected input size though you may need to tune this depending on how
* fast your consumer thread is.
* </p>
* <p>
* The {@code fair} parameter controls whether the locking policy used for
* the buffer is fair. When enabled this reduces throughput but also reduces
* the chance of thread starvation. This likely need only be set to
* {@code true} if there will be multiple consumers.
* </p>
* <p>
* The {@code pollTimeout} parameter controls how long each poll attempt
* waits for data to be produced. This prevents the consumer thread from
* blocking indefinitely and allows it to detect various potential deadlock
* conditions e.g. dead producer thread, another consumer closed the
* iterator etc. and errors out accordingly. It is unlikely that you will
* ever need to adjust this from the default value provided by
* </p>
* <p>
* The {@code maxPolls} parameter controls how many poll attempts will be
* made by a single consumer thread within the context of a single call to
* {@link #hasNext()} before the iterator declares the producer to be dead
* and errors out accordingly. You may need to adjust this if you have a
* slow producer thread or many consumer threads.
* </p>
* @param bufferSize
* Buffer size
* @param fair
* Whether the buffer should use a fair locking policy
* @param pollTimeout
* Poll timeout in milliseconds
* @param maxPolls
* Max poll attempts
public PipedRDFIterator(int bufferSize, boolean fair, int pollTimeout, int maxPolls) {
if (pollTimeout <= 0)
throw new IllegalArgumentException("Poll Timeout must be > 0");
if (maxPolls <= 0)
throw new IllegalArgumentException("Max Poll attempts must be > 0");
this.queue = new ArrayBlockingQueue<>(bufferSize, fair);
this.pollTimeout = pollTimeout;
this.maxPolls = maxPolls;
public boolean hasNext() {
if (!connected)
throw new IllegalStateException("Pipe not connected");
if (closedByConsumer)
throw new RiotException("Pipe closed");
if (finished)
return false;
consumerThread = Thread.currentThread();
// Depending on how code and/or the JVM schedules the threads involved
// there is a scenario that exists where a producer can finish/die
// before theconsumer is started and the consumer is scheduled onto the
// same thread thus resulting in a deadlock on the consumer because it
// will never be able to detect that the producer died
// In this scenario we need to set a special flag to indicate the
// possibility
if (producerThread != null && producerThread == consumerThread)
threadReused = true;
if (slot != null)
return true;
int attempts = 0;
while (true) {
try {
slot = queue.poll(this.pollTimeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
throw new CancellationException();
if (null != slot)
// If the producer thread died and did not call finish() then
// declare this pipe to be "broken"
// Since check is after the break, we will drain as much as possible
// out of the queue before throwing this exception
if (threadReused || (producerThread != null && !producerThread.isAlive() && !closedByProducer)) {
closedByConsumer = true;
throw new RiotException("Producer dead");
// Need to check this inside the loop as otherwise outside code that
// attempts to break the deadlock by causing close() on the iterator
// cannot do so
if (closedByConsumer)
throw new RiotException("Pipe closed");
// Need to check whether polling attempts have been exceeded
// If so declare the producer dead and exit
if (attempts >= this.maxPolls) {
closedByConsumer = true;
if (producerThread != null) {
throw new RiotException(
"Producer failed to produce any data within the specified number of polling attempts, declaring producer dead");
} else {
throw new RiotException("Producer failed to ever call start(), declaring producer dead");
// When the end marker is seen set slot to null
if (slot == endMarker) {
finished = true;
slot = null;
return false;
return true;
public T next() {
if (!hasNext())
throw new NoSuchElementException();
T item = slot;
slot = null;
return item;
public void remove() {
throw new UnsupportedOperationException();
private void checkStateForReceive() {
if (closedByProducer || closedByConsumer) {
throw new RiotException("Pipe closed");
} else if (consumerThread != null && !consumerThread.isAlive()) {
throw new RiotException("Consumer dead");
protected void connect() {
this.connected = true;
protected void receive(T t) {
producerThread = Thread.currentThread();
try {
} catch (InterruptedException e) {
throw new CancellationException();
protected void base(String base) {
synchronized (lock) {
this.baseIri = base;
* Gets the most recently seen Base IRI
* @return Base IRI
public String getBaseIri() {
synchronized (lock) {
return baseIri;
protected void prefix(String prefix, String iri) {
synchronized (lock) {
prefixes.add(prefix, iri);
* Gets the prefix map which contains the prefixes seen so far in the stream
* @return Prefix Map
public PrefixMap getPrefixes() {
synchronized (lock) {
// Need to return a copy since PrefixMap is not concurrent
return PrefixMapFactory.create(this.prefixes);
* Should be called by the producer when it begins writing to the iterator.
* If the producer fails to call this for whatever reason and never produces
* any output or calls {@code finish()} consumers may be blocked for a short
* period before they detect this state and error out.
protected void start() {
// Track the producer thread in case it never delivers us anything and
// dies before calling finish
producerThread = Thread.currentThread();
* Should be called by the producer when it has finished writing to the
* iterator. If the producer fails to call this for whatever reason
* consumers may be blocked for a short period before they detect this state
* and error out.
protected void finish() {
if ( closedByProducer )
return ;
closedByProducer = true;
* May be called by the consumer when it is finished reading from the
* iterator, if the producer thread has not finished it will receive an
* error the next time it tries to write to the iterator
public void close() {
closedByConsumer = true;