blob: 17e7a64e8434c2b4030c6fb42660130dcf2c0cce [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.stanbol.entityhub.indexing;
import static java.lang.System.exit;
import static java.lang.System.out;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.StringTokenizer;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.PosixParser;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Implemented to allow importing the Musicbrainz dump to Jena TDB.<p>
*
* The problem is that RDF dumps with blank nodes do require to store a
* lookup table for the black nodes IDs during import.<p> In case a dump
* contains millions of such nodes this table does no longer fit into
* memory. This makes importing Dumps to with the Jena RDF parser impossible.
* <p>
* This Utility can replaces nodes that start with "_:{id}" with
* "<{prefix}{id}>. The prefix must be set with the "-p" parameter.
* <p>
* This tool supports "gz" and "bz2" compressed files. If the output will use
* the same compression as the input. It uses two threads for "reading/processing"
* and "writing". It could also process multiple files in parallel. However this
* feature is not yet activated as the Musicbrainz dump comes in a single file.
*
* @author Rupert Westenthaler
*
*/
public class Urify implements Runnable{
private static final String EOF_INDICATOR = "__EOF_INDICATOR";
private static Logger log = LoggerFactory.getLogger(Urify.class);
private static final Options options;
static {
options = new Options();
options.addOption("h", "help", false, "display this help and exit");
options.addOption("p","prefix",true,
"The URI prefix used for wrapping the bNode Id");
options.addOption("e","encoding",true, "the char encodinf (default: UTF-8)");
options.addOption("o","outputFilePrefix",true, "The prefix to add to output files, defaults to \"uf_\"");
}
/**
* @param args
* @throws ParseException
*/
public static void main(String[] args) throws IOException, ParseException {
CommandLineParser parser = new PosixParser();
CommandLine line = parser.parse(options, args);
args = line.getArgs();
if (line.hasOption('h')) {
out.println("Processes RDF files to translate blank nodes into prefixed URI nodes.");
out.println("-h/--help: Print this help and exit.");
out.println("-p/--prefix: Required: The prefix to add to blank nodes to make them URIs.");
out.println("-e/--encoding: The text encoding to expect in the RDF, defaults to UTF-8.");
out.println("-o/--outputFilePrefix: The prefix to add to output files, defaults to \"uf_\".");
exit(0);
}
if(!line.hasOption('p')){
log.error("Missing parameter 'prefix' ('p)!");
exit(1);
}
String prefix = "<"+line.getOptionValue('p');
log.info("Using prefix: {} ",line.getOptionValue('p'));
Charset charset;
if(line.hasOption('e')){
charset = Charset.forName(line.getOptionValue('e'));
if(charset == null){
log.error("Unsupported encoding '{}'!",line.getOptionValue('e'));
exit(1);
}
} else {
charset = Charset.forName("UTF-8");
}
log.info("charset: {} ",charset.name());
Urify urify = new Urify(Arrays.asList(args), charset, prefix,
line.hasOption('o') ? line.getOptionValue('o') : "uf_");
urify.run(); //TODO: this could support processing multiple files in parallel
}
private final Charset charset;
private final String prefix;
private final String outputFilePrefix;
protected long start = System.currentTimeMillis();
protected long uf_count = 0;
private List<String> resources;
public Urify(List<String> resources, Charset charset, String prefix,
final String outputFilePrefix) throws IOException {
this.charset = charset;
this.prefix = prefix;
this.outputFilePrefix = outputFilePrefix;
this.resources = Collections.synchronizedList(new ArrayList<String>(
resources));
}
public void run() {
String source;
do {
synchronized (resources) {
if(resources.isEmpty()){
source = null;
} else {
source = resources.remove(0);
try {
urify(source);
} catch (Exception e) {
log.error("Unable to Urify "+resources,e);
}
}
}
} while (source != null);
}
private void urify(String resource) throws IOException {
File source = new File(resource);
if(source.isFile()){
String path = FilenameUtils.getFullPathNoEndSeparator(resource);
String name = FilenameUtils.getName(resource);
File target = new File(path, outputFilePrefix + name);
int i=0;
while(target.exists()){
i++;
target = new File(path,"uf"+i+"_"+name);
}
InputStream is = new FileInputStream(source);
OutputStream os = new FileOutputStream(target);
log.info("RDFTerm: {}",resource);
log.info("Target : {}",target);
if ("gz".equalsIgnoreCase(FilenameUtils.getExtension(name))) {
is = new GZIPInputStream(is);
os = new GZIPOutputStream(os);
name = FilenameUtils.removeExtension(name);
log.debug(" - from GZIP Archive");
} else if ("bz2".equalsIgnoreCase(FilenameUtils.getExtension(name))) {
is = new BZip2CompressorInputStream(is);
os = new BZip2CompressorOutputStream(os);
name = FilenameUtils.removeExtension(name);
log.debug(" - from BZip2 Archive");
}// TODO: No Zip File support
//else no complression
BlockingQueue<String> queue = new ArrayBlockingQueue<String>(1000);
ReaderDaemon reader = new ReaderDaemon(new BufferedReader(new InputStreamReader(is, charset)), queue);
WriterDaemon writer = new WriterDaemon(
new BufferedWriter(new OutputStreamWriter(os, charset)), queue);
Thread readerDaemon = new Thread(reader,name+" reader");
Thread writerDaemon = new Thread(writer,name+" writer");
readerDaemon.setDaemon(true);
writerDaemon.setDaemon(true);
writerDaemon.start();
readerDaemon.start();
Object notifier = writer.getNotifier();
synchronized (notifier) { //wait until processed
if(!writer.completed()){
try {
notifier.wait();
} catch (InterruptedException e) {/*ignore*/}
}
}
if(reader.getError() != null){
throw new IOException("Error while reading source "+source,reader.getError());
}
if(writer.getError() != null){
throw new IOException("Error while writing resource "+target,writer.getError());
}
log.info(" ... completed resource {}",resource);
} else {
throw new FileNotFoundException("Parsed File "+resource+" does not exist or is not a File!");
}
}
private class ReaderDaemon implements Runnable {
private final BufferedReader reader;
private final BlockingQueue<String> queue;
private Exception error;
protected ReaderDaemon(BufferedReader reader, BlockingQueue<String> queue){
this.reader = reader;
this.queue = queue;
}
public Exception getError() {
return error;
}
@Override
public void run() {
String triple;
try {
while((triple = reader.readLine()) != null){
StringBuilder sb = new StringBuilder();
StringTokenizer st = new StringTokenizer(triple," \t");
while(st.hasMoreElements()){
String node = st.nextToken();
if(node.startsWith("_:")){ //convert to uri
sb.append(prefix);
sb.append(node.substring(2,node.length()));
sb.append("> ");
uf_count++;
} else {
sb.append(node);
if(node.length()>1){
//the final '.' is also a node
sb.append(" ");
}
}
}
queue.put(sb.toString());
}
} catch (IOException e) {
error = e;
} catch (InterruptedException e) {
error = e;
} finally {
IOUtils.closeQuietly(reader);
try {
queue.put(EOF_INDICATOR); //indicates finished
} catch (InterruptedException e) {
log.error("Unable to put EOF to queue!",e);
}
}
}
}
private class WriterDaemon implements Runnable {
private final BufferedWriter writer;
private final BlockingQueue<String> queue;
private final Object notifier = new Object();
private boolean completed = false;
private Exception error;
protected WriterDaemon(BufferedWriter writer, BlockingQueue<String> queue){
this.writer = writer;
this.queue = queue;
}
public Exception getError() {
return error;
}
public Object getNotifier() {
return notifier;
}
public boolean completed() {
return completed;
}
@Override
public void run() {
String triple;
long count = 0;
boolean first = true;
try {
while(!EOF_INDICATOR.equals((triple = queue.take()))){
if(count % 1000000 == 0){
//NOTE: urified will not be correct as it is counted
// by an other thread, but for logging ...
long end = System.currentTimeMillis();
log.info("processed {} | urified: {} (batch: {}sec)",
new Object[]{count,uf_count,((double)(end-start))/1000});
start = end;
}
count++;
if(first){
first = false;
} else {
writer.write("\n");
}
writer.write(triple);
}
} catch (InterruptedException e) {
this.error = e;
} catch (IOException e) {
this.error = e;
} finally {
IOUtils.closeQuietly(writer);
this.completed = true;
synchronized (notifier) {
notifier.notifyAll();
}
}
}
}
}