blob: 84e0210263f6eaaa72b75fcde880f9b7f0b1facb [file] [log] [blame]
package org.apache.rya.accumulo.mr;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.io.IOException;
import java.io.PipedReader;
import java.io.PipedWriter;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader;
import org.apache.log4j.Logger;
import org.apache.rya.accumulo.AccumuloRdfConfiguration;
import org.apache.rya.api.domain.RyaStatement;
import org.apache.rya.api.resolver.RdfToRyaConversions;
import org.apache.rya.api.resolver.RyaTripleContext;
import org.apache.rya.rdftriplestore.utils.RdfFormatUtils;
import org.eclipse.rdf4j.model.Statement;
import org.eclipse.rdf4j.rio.RDFFormat;
import org.eclipse.rdf4j.rio.RDFHandler;
import org.eclipse.rdf4j.rio.RDFHandlerException;
import org.eclipse.rdf4j.rio.RDFParseException;
import org.eclipse.rdf4j.rio.RDFParser;
import org.eclipse.rdf4j.rio.Rio;
/**
* {@link FileInputFormat} that can read multiple RDF files and convert into
* statements.
* <p>
* Expects all files to use the same RDF serialization format, which must be
* provided.
* <p>
* Reading and parsing is done asynchronously, so entire files need not be
* loaded into memory at once. Reading will block when a character buffer is
* full, waiting for the parser to consume more data. The parser will block when
* a statement buffer is full, waiting for the client to consume the statements
* generated so far. This enables large files, particularly useful for N-Triples
* and N-Quads formats, which can be parsed one line at a time. The size of each
* buffer can be configured. An error will be thrown if the parser takes too
* long to respond, and this timeout can be configured.
* <p>
* Only N-Triples and N-Quads files may be split into multiple
* {@link InputSplit}s per file, if large enough. Input is read line-by-line,
* and each line of an N-Triples or N-Quads file is self-contained, so any
* arbitrary split is valid. This means the number of input splits may be
* greater than the number of input files if and only if N-Triples or N-Quads is
* given as the RDF serialization format.
*/
public class RdfFileInputFormat extends FileInputFormat<LongWritable, RyaStatementWritable> {
private static final Logger logger = Logger.getLogger(RdfFileInputFormat.class);
private static final String PREFIX = RdfFileInputFormat.class.getSimpleName();
private static final String CHAR_BUFFER_SIZE_PROP = PREFIX + ".char.buffer.size";
private static final String STATEMENT_BUFFER_SIZE_PROP = PREFIX + ".statement.buffer.size";
private static final String TIMEOUT_PROP = PREFIX + ".timeout";
private static final String FORMAT_PROP = PREFIX + ".rdf.format";
private static final RDFFormat DEFAULT_RDF_FORMAT = RDFFormat.RDFXML;
private static final int DEFAULT_CHAR_BUFFER_SIZE = 1024*1024;
private static final int DEFAULT_STATEMENT_BUFFER_SIZE = 1024;
private static final int DEFAULT_TIMEOUT = 20;
static final RyaStatementWritable DONE = new RyaStatementWritable(null, null); // signals the end of input
static final RyaStatementWritable ERROR = new RyaStatementWritable(null, null); // signals some error
/**
* Set the RDF serialization format to parse. All input files must have the
* same format.
* @param job Job to apply the setting to
* @param format Format of any and all input files
*/
public static void setRDFFormat(Job job, RDFFormat format) {
job.getConfiguration().set(FORMAT_PROP, format.getName());
}
/**
* Specify the size, in characters, of the input buffer: hold this many
* characters in memory before blocking file input.
*/
public static void setCharBufferSize(Job job, int size) {
job.getConfiguration().setInt(CHAR_BUFFER_SIZE_PROP, size);
}
/**
* Specify the size, in statements, of the parser output buffer: hold this
* many Statements in memory before blocking the parser.
*/
public static void setStatementBufferSize(Job job, int size) {
job.getConfiguration().setInt(STATEMENT_BUFFER_SIZE_PROP, size);
}
/**
* Property to specify the timeout, in seconds:
*/
public static void setTimeout(Job job, int seconds) {
job.getConfiguration().setInt(TIMEOUT_PROP, seconds);
}
private RDFFormat getRDFFormat(JobContext context) {
String name = context.getConfiguration().get(FORMAT_PROP);
return RdfFormatUtils.getRdfFormatFromName(name);
}
/**
* Determine whether an input file can be split. If the input format is
* configured to be anything other than N-Triples or N-Quads, then the
* structure of the file is important and it cannot be split arbitrarily.
* Otherwise, default to the superclass logic to determine whether splitting
* is appropriate.
* @return true if configured to use a line-based input format and the
* superclass implementation returns true.
*/
@Override
protected boolean isSplitable(JobContext context, Path filename) {
RDFFormat rdfFormat = getRDFFormat(context);
if (RDFFormat.NTRIPLES.equals(rdfFormat) || RDFFormat.NQUADS.equals(rdfFormat)) {
return super.isSplitable(context, filename);
}
return false;
}
/**
* Instantiate a RecordReader for a given task attempt.
* @param inputSplit Input split to handle, may refer to part or all of
* an RDF file
* @param taskAttemptContext Contains configuration options.
* @return A RecordReader that reads and parses RDF text.
*/
@Override
public RecordReader<LongWritable, RyaStatementWritable> createRecordReader(InputSplit inputSplit,
TaskAttemptContext taskAttemptContext) {
Configuration conf = taskAttemptContext.getConfiguration();
RDFFormat format = getRDFFormat(taskAttemptContext);
if (format == null) {
format = DEFAULT_RDF_FORMAT;
}
int charBufferSize = conf.getInt(CHAR_BUFFER_SIZE_PROP, DEFAULT_CHAR_BUFFER_SIZE);
int statementBufferSize = conf.getInt(STATEMENT_BUFFER_SIZE_PROP, DEFAULT_STATEMENT_BUFFER_SIZE);
int timeoutSeconds = conf.getInt(TIMEOUT_PROP, DEFAULT_TIMEOUT);
return new RdfFileRecordReader(format, charBufferSize, statementBufferSize, timeoutSeconds);
}
/**
* Reads RDF files and generates RyaStatementWritables. Reads and parses
* data in parallel, so the entire file need not be loaded at once.
*/
class RdfFileRecordReader extends RecordReader<LongWritable, RyaStatementWritable> implements RDFHandler {
private RecordReader<Text, Text> lineReader;
private final PipedWriter pipeOut;
private final PipedReader pipeIn;
private final RDFParser rdfParser;
final BlockingQueue<RyaStatementWritable> statementCache;
private long lineCount = 0;
private long statementCount = 0;
private RyaTripleContext tripleContext;
private RyaStatementWritable nextStatement = null;
private int timeoutSeconds;
private boolean noMoreStatements = false;
Thread readerThread;
Thread parserThread;
private Exception threadException;
/**
* Instantiates the RecordReader.
* @param format RDF serialization format to parse.
* @param charBufferSize Number of input characters to hold in
* memory; if exceeded, wait until the parser
* thread consumes some text before proceeding
* with reading input.
* @param statementBufferSize Number of output statements to hold in
* memory; if exceeded, wait until the
* client consumes data before proceeding
* with parsing.
* @param timeoutSeconds Number of seconds to wait for the parser
* thread to provide the next statement (or
* state that there are none). If exceeded,
* abort.
*/
RdfFileRecordReader(RDFFormat format, int charBufferSize, int statementBufferSize, int timeoutSeconds) {
rdfParser = Rio.createParser(format);
rdfParser.setRDFHandler(this);
statementCache = new LinkedBlockingQueue<RyaStatementWritable>(statementBufferSize);
pipeOut = new PipedWriter();
pipeIn = new PipedReader(charBufferSize);
this.timeoutSeconds = timeoutSeconds;
logger.info("Initializing RecordReader with parameters:");
logger.info("\tRDF serialization format = " + format.getName());
logger.info("\tinput buffer size = " + charBufferSize + " characters");
logger.info("\tstatement cache size = " + statementBufferSize);
logger.info("\tparser timeout = " + timeoutSeconds + " seconds");
}
/**
* Starts up one thread for reading text data (via an internal line
* based RecordReader) and one thread for receiving and parsing that
* data, each blocking when their respective buffers are full.
* @param inputSplit The section of data to read
* @param taskAttemptContext Contains configuration variables
* @throws IOException if an error is encountered initializing the line
* RecordReader or piping its output to the parser thread.
* @throws InterruptedException if an error is encountered initializing
* the line RecordReader
*/
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
Configuration conf = taskAttemptContext.getConfiguration();
lineReader = new KeyValueLineRecordReader(conf);
lineReader.initialize(inputSplit, taskAttemptContext);
tripleContext = RyaTripleContext.getInstance(new AccumuloRdfConfiguration(conf));
pipeIn.connect(pipeOut);
readerThread = new Thread(Thread.currentThread().getName() + " -- reader thread") {
@Override
public void run() {
try {
logger.info("Starting file reader");
while (lineReader.nextKeyValue()) {
Text key = lineReader.getCurrentKey();
Text value = lineReader.getCurrentValue();
pipeOut.write(key.toString());
if (value.getLength() > 0) {
pipeOut.write(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR);
pipeOut.write(value.toString());
}
pipeOut.write('\n');
lineCount++;
}
logger.info("Reached end of input text; read " + lineCount + " lines in total");
} catch (IOException | InterruptedException e) {
logger.error("Error processing line " + (lineCount+1) + " of input", e);
fail(e, this);
throw new RuntimeException(e.getMessage(), e);
}
finally {
try { lineReader.close(); } catch (IOException e) { logger.warn(e); }
try { pipeOut.close(); } catch (IOException e) { logger.warn(e); }
}
}
};
parserThread = new Thread(Thread.currentThread().getName() + " -- parser thread") {
@Override
public void run() {
try {
logger.info("Starting parser");
rdfParser.parse(pipeIn, "");
}
catch (RDFHandlerException | RDFParseException | IOException e) {
logger.error(e.getMessage(), e);
fail(e, this);
throw new RuntimeException(e.getMessage(), e);
}
finally {
try { pipeIn.close(); } catch (IOException e) { logger.warn(e); }
}
}
};
readerThread.start();
parserThread.start();
}
private void fail(Exception e, Thread source) {
// Notify the main RecordReader of the error
statementCache.offer(ERROR);
threadException = e;
// Kill the reader thread if necessary
if (source != readerThread && readerThread.isAlive()) {
readerThread.interrupt();
}
// Kill the parser thread if necessary
if (source != parserThread && parserThread.isAlive()) {
parserThread.interrupt();
}
}
/**
* Loads the next statement, if there is one, and returns whether there
* is one. Receives statements from the parser thread via a blocking
* queue.
* @throws InterruptedException if interrupted while waiting for a
* statement to show up in the queue.
* @throws IOException if the parser thread doesn't respond after the
* configured timeout, or if any thread reports an error.
* @return true if a valid statement was loaded, or false if there are
* no more statements in this input split.
*/
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (noMoreStatements) {
return false;
}
nextStatement = statementCache.poll(timeoutSeconds, TimeUnit.SECONDS);
if (nextStatement == null) {
throw new IOException("Parser neither sending results nor signaling end of data after "
+ timeoutSeconds + " seconds.");
}
else if (nextStatement == DONE) {
logger.info("Reached end of parsed RDF; read " + statementCount + " statements in total.");
nextStatement = null;
noMoreStatements = true;
return false;
}
else if (nextStatement == ERROR) {
nextStatement = null;
noMoreStatements = true;
throw new IOException("Error detected processing input.", threadException);
}
statementCount++;
return true;
}
/**
* Gets the current key.
* @return the number of statements read so far, or null if all input
* has been read.
*/
@Override
public LongWritable getCurrentKey() {
if (noMoreStatements) {
return null;
}
return new LongWritable(statementCount);
}
/**
* Gets the current value.
* @return a RyaStatementWritable loaded from RDF data, or null if all
* input has been read.
*/
@Override
public RyaStatementWritable getCurrentValue() {
return nextStatement;
}
/**
* Gets the progress of the underlying line-based Record Reader. Does
* not include any information about the progress of the parser.
* @return The proportion of text input that has been read.
* @throws IOException if thrown by the internal RecordReader.
* @throws InterruptedException if thrown by the internal RecordReader.
*/
@Override
public float getProgress() throws IOException, InterruptedException {
return lineReader.getProgress();
}
/**
* Closes all the underlying resources.
*/
@Override
public void close() {
if (parserThread.isAlive()) {
parserThread.interrupt();
}
if (readerThread.isAlive()) {
readerThread.interrupt();
}
try { lineReader.close(); } catch (IOException e) { logger.warn(e); }
try { pipeOut.close(); } catch (IOException e) { logger.warn(e); }
try { pipeIn.close(); } catch (IOException e) { logger.warn(e); }
}
/**
* Has no effect.
*/
@Override
public void startRDF() throws RDFHandlerException {
}
/**
* Add a dummy item to the queue to signal that there will be no more
* statements.
* @throws RDFHandlerException if interrupted while waiting for
* the blocking queue to be ready to accept the done signal.
*/
@Override
public void endRDF() throws RDFHandlerException {
logger.info("Finished parsing RDF");
try {
statementCache.put(DONE);
} catch (InterruptedException e) {
throw new RDFHandlerException("Interrupted while waiting to add done signal to statement queue", e);
}
}
/**
* Has no effect.
*/
@Override
public void handleNamespace(String s, String s1) throws RDFHandlerException {
}
/**
* Convert the {@link Statement} to a {@link RyaStatement}, wrap it in a
* {@link RyaStatementWritable}, and add it to the queue.
* @throws RDFHandlerException if interrupted while waiting for the
* blocking queue to be ready to accept statement data.
*/
@Override
public void handleStatement(Statement statement) throws RDFHandlerException {
try {
statementCache.put(new RyaStatementWritable(RdfToRyaConversions.convertStatement(statement), tripleContext));
} catch (InterruptedException e) {
throw new RDFHandlerException("Interrupted while waiting to add parsed statement to the statement queue", e);
}
}
/**
* Has no effect.
*/
@Override
public void handleComment(String s) {
}
}
}