blob: 15ae003e6aa7ffeee20e6bc3bc5f0575e83da855 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.any23.filter;
import org.apache.any23.extractor.ExtractionContext;
import org.apache.any23.writer.TripleHandler;
import org.apache.any23.writer.TripleHandlerException;
import org.eclipse.rdf4j.model.Resource;
import org.eclipse.rdf4j.model.IRI;
import org.eclipse.rdf4j.model.Value;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
/**
* A wrapper around a {@link TripleHandler} that can block and unblock calls to the handler, either for the entire
* document, or for individual {@link ExtractionContext}s. A document is initially blocked and must be explicitly
* unblocked. Contexts are initially unblocked and must be explicitly blocked. Unblocking a document unblocks all
* contexts as well. This class it thread-safe.
*
* @author Richard Cyganiak (richard@cyganiak.de)
*/
public class ExtractionContextBlocker implements TripleHandler {
private TripleHandler wrapped;
private Map<String, ValvedTriplePipe> contextQueues = new HashMap<String, ValvedTriplePipe>();
private boolean documentBlocked;
public ExtractionContextBlocker(TripleHandler wrapped) {
this.wrapped = wrapped;
}
public boolean isDocBlocked() {
return documentBlocked;
}
public synchronized void blockContext(ExtractionContext context) {
if (!documentBlocked)
return;
try {
contextQueues.get(context.getUniqueID()).block();
} catch (ValvedTriplePipeException e) {
throw new RuntimeException("Error while blocking context", e);
}
}
public synchronized void unblockContext(ExtractionContext context) {
try {
contextQueues.get(context.getUniqueID()).unblock();
} catch (ValvedTriplePipeException e) {
throw new RuntimeException("Error while unblocking context", e);
}
}
public synchronized void startDocument(IRI documentIRI) throws TripleHandlerException {
wrapped.startDocument(documentIRI);
documentBlocked = true;
}
public synchronized void openContext(ExtractionContext context) throws TripleHandlerException {
contextQueues.put(context.getUniqueID(), new ValvedTriplePipe(context));
}
public synchronized void closeContext(ExtractionContext context) {
// Empty. We'll close all contexts when the document is finished.
}
public synchronized void unblockDocument() {
if (!documentBlocked)
return;
documentBlocked = false;
for (ValvedTriplePipe pipe : contextQueues.values()) {
try {
pipe.unblock();
} catch (ValvedTriplePipeException e) {
throw new RuntimeException("Error while unblocking context", e);
}
}
}
public synchronized void receiveTriple(Resource s, IRI p, Value o, IRI g, ExtractionContext context)
throws TripleHandlerException {
try {
contextQueues.get(context.getUniqueID()).receiveTriple(s, p, o, g);
} catch (ValvedTriplePipeException e) {
throw new TripleHandlerException(
String.format(Locale.ROOT, "Error while receiving triple %s %s %s", s, p, o), e);
}
}
public synchronized void receiveNamespace(String prefix, String uri, ExtractionContext context)
throws TripleHandlerException {
try {
contextQueues.get(context.getUniqueID()).receiveNamespace(prefix, uri);
} catch (ValvedTriplePipeException e) {
throw new TripleHandlerException(
String.format(Locale.ROOT, "Error while receiving namespace %s:%s", prefix, uri), e);
}
}
public synchronized void close() throws TripleHandlerException {
closeDocument();
wrapped.close();
}
public synchronized void endDocument(IRI documentIRI) throws TripleHandlerException {
closeDocument();
wrapped.endDocument(documentIRI);
}
public void setContentLength(long contentLength) {
// Empty.
}
private void closeDocument() {
for (ValvedTriplePipe pipe : contextQueues.values()) {
try {
pipe.close();
} catch (ValvedTriplePipeException e) {
throw new RuntimeException("Error closing document", e);
}
}
contextQueues.clear();
}
private static class ValvedTriplePipeException extends Exception {
private ValvedTriplePipeException(String s) {
super(s);
}
private ValvedTriplePipeException(Throwable throwable) {
super(throwable);
}
private ValvedTriplePipeException(String s, Throwable throwable) {
super(s, throwable);
}
}
private class ValvedTriplePipe {
private final ExtractionContext context;
private final List<Resource> subjects = new ArrayList<Resource>();
private final List<IRI> predicates = new ArrayList<IRI>();
private final List<Value> objects = new ArrayList<Value>();
private final List<IRI> graphs = new ArrayList<IRI>();
private final List<String> prefixes = new ArrayList<String>();
private final List<String> uris = new ArrayList<String>();
private boolean blocked = false;
private boolean hasReceivedTriples = false;
ValvedTriplePipe(ExtractionContext context) {
this.context = context;
}
void receiveTriple(Resource s, IRI p, Value o, IRI g) throws ValvedTriplePipeException {
if (blocked) {
subjects.add(s);
predicates.add(p);
objects.add(o);
graphs.add(g);
} else {
sendTriple(s, p, o, g);
}
}
void receiveNamespace(String prefix, String uri) throws ValvedTriplePipeException {
if (blocked) {
prefixes.add(prefix);
uris.add(uri);
} else {
sendNamespace(prefix, uri);
}
}
void block() throws ValvedTriplePipeException {
if (blocked)
return;
blocked = true;
}
void unblock() throws ValvedTriplePipeException {
if (!blocked)
return;
blocked = false;
for (int i = 0; i < prefixes.size(); i++) {
sendNamespace(prefixes.get(i), uris.get(i));
}
for (int i = 0; i < subjects.size(); i++) {
sendTriple(subjects.get(i), predicates.get(i), objects.get(i), graphs.get(i));
}
}
void close() throws ValvedTriplePipeException {
if (hasReceivedTriples) {
try {
wrapped.closeContext(context);
} catch (TripleHandlerException e) {
throw new ValvedTriplePipeException("Error while closing the triple hanlder", e);
}
}
}
private void sendTriple(Resource s, IRI p, Value o, IRI g) throws ValvedTriplePipeException {
if (!hasReceivedTriples) {
try {
wrapped.openContext(context);
} catch (TripleHandlerException e) {
throw new ValvedTriplePipeException("Error while opening the triple handler", e);
}
hasReceivedTriples = true;
}
try {
wrapped.receiveTriple(s, p, o, g, context);
} catch (TripleHandlerException e) {
throw new ValvedTriplePipeException("Error while opening the triple handler", e);
}
}
private void sendNamespace(String prefix, String uri) throws ValvedTriplePipeException {
if (!hasReceivedTriples) {
try {
wrapped.openContext(context);
} catch (TripleHandlerException e) {
throw new ValvedTriplePipeException("Error while sending the namespace", e);
}
hasReceivedTriples = true;
}
try {
wrapped.receiveNamespace(prefix, uri, context);
} catch (TripleHandlerException e) {
throw new ValvedTriplePipeException("Error while receiving the namespace", e);
}
}
}
}