blob: 6d1aeeccee8133a65e1802a54b5ffb97c8dabb08 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.jackrabbit.oak.plugins.tika;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import com.google.common.io.ByteSource;
import com.google.common.io.CountingInputStream;
import org.apache.jackrabbit.oak.commons.IOUtils;
import org.apache.jackrabbit.oak.commons.io.LazyInputStream;
import org.apache.jackrabbit.oak.plugins.blob.datastore.TextWriter;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.parser.ParseContext;
import org.apache.tika.sax.WriteOutContentHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class TextExtractor implements Closeable {
private static final Logger log = LoggerFactory.getLogger(TextExtractor.class);
private static final Logger parserError = LoggerFactory.getLogger("org.apache.jackrabbit.oak.plugins.tika.ParserError");
private static final int PROGRESS_BATCH_SIZE = 1000;
private static final int MAX_EXTRACT_LENGTH = 100000;
private static final String ERROR_TEXT = "TextExtractionError";
private final TextWriter textWriter;
private final WorkItem SHUTDOWN_SIGNAL = new WorkItem(null);
private BlockingQueue<WorkItem> inputQueue;
private ExecutorService executorService;
private int threadPoolSize = Runtime.getRuntime().availableProcessors();
private int queueSize = 100;
private final AtomicInteger errorCount = new AtomicInteger();
private final AtomicLong timeTaken = new AtomicLong();
private final AtomicInteger extractionCount = new AtomicInteger();
private final AtomicInteger textWrittenCount = new AtomicInteger();
private final AtomicInteger parserErrorCount = new AtomicInteger();
private final AtomicInteger processedCount = new AtomicInteger();
private final AtomicInteger emptyCount = new AtomicInteger();
private final AtomicInteger notSupportedCount = new AtomicInteger();
private final AtomicInteger alreadyExtractedCount = new AtomicInteger();
private final AtomicLong extractedTextSize = new AtomicLong();
private final AtomicLong nonEmptyExtractedTextSize = new AtomicLong();
private final AtomicLong totalSizeRead = new AtomicLong();
private int maxExtractedLength = MAX_EXTRACT_LENGTH;
private File tikaConfig;
private TikaHelper tika;
private boolean initialized;
private BinaryStats stats;
private boolean closed;
public TextExtractor(TextWriter textWriter) {
this.textWriter = textWriter;
}
public void extract(Iterable<BinaryResource> binaries) throws InterruptedException, IOException {
initialize();
for (BinaryResource binary : binaries) {
inputQueue.put(new WorkItem(binary));
}
}
@Override
public void close() {
if (closed) {
return;
}
if (!inputQueue.isEmpty()) {
log.info("Shutting down the extractor. Pending task count {}", inputQueue.size());
}
if (executorService != null) {
try {
inputQueue.put(SHUTDOWN_SIGNAL);
executorService.shutdown();
//Wait long enough
executorService.awaitTermination(10, TimeUnit.DAYS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
dumpStats();
closed = true;
}
public void setTikaConfig(File tikaConfig) {
this.tikaConfig = tikaConfig;
}
public void setThreadPoolSize(int threadPoolSize) {
this.threadPoolSize = threadPoolSize;
}
public void setStats(BinaryStats stats) {
this.stats = stats;
}
private void dumpStats() {
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
pw.println("Text extraction stats");
pw.printf("\t Processed Count : %d%n", processedCount.get());
pw.printf("\t Extraction Count : %d%n", extractionCount.get());
pw.printf("\t Empty Count : %d%n", emptyCount.get());
pw.printf("\t Text Written Count : %d%n", textWrittenCount.get());
pw.printf("\t Parser Error Count : %d%n", parserErrorCount.get());
pw.printf("\t Error Count : %d%n", errorCount.get());
pw.printf("\t Not Supported Count : %d%n", notSupportedCount.get());
pw.printf("\t Already processed Count : %d%n", alreadyExtractedCount.get());
pw.printf("\t Total bytes read : %s%n", IOUtils.humanReadableByteCount(totalSizeRead.get()));
pw.printf("\t Total text extracted : %s%n", IOUtils.humanReadableByteCount(extractedTextSize.get()));
pw.printf("\t Non empty text : %s%n", IOUtils.humanReadableByteCount(nonEmptyExtractedTextSize.get()));
pw.printf("\t Time taken : %d sec%n", timeTaken.get() / 1000);
pw.close();
log.info(sw.toString());
}
private void dumpProgress(int count) {
if (count % PROGRESS_BATCH_SIZE == 0) {
String progress = "";
if (stats != null) {
double processedPercent = count * 1.0 / stats.getTotalCount() * 100;
double indexedPercent = extractionCount.get() * 1.0 / stats.getIndexedCount() * 100;
progress = String.format("(%1.2f%%) (Extraction stats %d/%d %1.2f%%, Ignored count %d)",
processedPercent, extractionCount.get(), stats.getIndexedCount(),
indexedPercent, notSupportedCount.get());
}
log.info("Processed {} {} binaries so far ...", count, progress);
}
}
private synchronized void initialize() throws IOException {
if (initialized) {
return;
}
inputQueue = new ArrayBlockingQueue<WorkItem>(queueSize);
tika = new TikaHelper(tikaConfig);
initializeExecutorService();
initialized = true;
}
private void extractText(BinaryResource source) throws IOException {
String type = source.getMimeType();
if (type == null || !tika.isSupportedMediaType(type)) {
log.trace("Ignoring binary content for node {} due to unsupported " +
"(or null) jcr:mimeType [{}]", source, type);
notSupportedCount.incrementAndGet();
return;
}
String blobId = source.getBlobId();
if (textWriter.isProcessed(blobId)) {
alreadyExtractedCount.incrementAndGet();
return;
}
//TODO Handle case where same blob is being concurrently processed
Metadata metadata = new Metadata();
metadata.set(Metadata.CONTENT_TYPE, type);
if (source.getEncoding() != null) { // not mandatory
metadata.set(Metadata.CONTENT_ENCODING, source.getEncoding());
}
String extractedContent = parseStringValue(source.getByteSource(), metadata, source.getPath());
if (ERROR_TEXT.equals(extractedContent)) {
textWriter.markError(blobId);
} else if (extractedContent != null) {
extractedContent = extractedContent.trim();
if (!extractedContent.isEmpty()) {
nonEmptyExtractedTextSize.addAndGet(extractedContent.length());
textWriter.write(blobId, extractedContent);
textWrittenCount.incrementAndGet();
} else {
textWriter.markEmpty(blobId);
emptyCount.incrementAndGet();
}
}
}
private void initializeExecutorService() {
executorService = Executors.newFixedThreadPool(threadPoolSize);
for (int i = 0; i < threadPoolSize; i++) {
executorService.submit(new Extractor());
}
log.info("Initialized text extractor pool with {} threads", threadPoolSize);
}
private class Extractor implements Runnable {
@Override
public void run() {
while (true) {
WorkItem workItem = null;
try {
workItem = inputQueue.take();
if (workItem == SHUTDOWN_SIGNAL) {
inputQueue.put(SHUTDOWN_SIGNAL); //put back for other workers
return;
}
extractText(workItem.source);
dumpProgress(processedCount.incrementAndGet());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
} catch (Exception e) {
errorCount.incrementAndGet();
log.warn("Error occurred while processing {}", workItem, e);
}
}
}
}
//~--------------------------------------< Tika >
private String parseStringValue(ByteSource byteSource, Metadata metadata, String path) {
WriteOutContentHandler handler = new WriteOutContentHandler(maxExtractedLength);
long start = System.currentTimeMillis();
long size = 0;
try {
CountingInputStream stream = new CountingInputStream(new LazyInputStream(byteSource));
try {
tika.getParser().parse(stream, handler, metadata, new ParseContext());
} finally {
size = stream.getCount();
stream.close();
}
} catch (LinkageError e) {
// Capture errors caused by extraction libraries
// not being present. This is equivalent to disabling
// selected media types in configuration, so we can simply
// ignore these errors.
log.debug("Failed to extract text from a binary property: {}."
+ " This often happens when some media types are disabled by configuration."
+ " The stack trace is included to flag some 'unintended' failures",
path, e);
parserErrorCount.incrementAndGet();
return ERROR_TEXT;
} catch (Throwable t) {
// Capture and report any other full text extraction problems.
// The special STOP exception is used for normal termination.
if (!handler.isWriteLimitReached(t)) {
parserErrorCount.incrementAndGet();
parserError.debug("Failed to extract text from a binary property: "
+ path
+ " This is a fairly common case, and nothing to"
+ " worry about. The stack trace is included to"
+ " help improve the text extraction feature.", t);
return ERROR_TEXT;
} else {
parserError.debug("Extracted text size exceeded configured limit({})", maxExtractedLength);
}
}
String result = handler.toString();
timeTaken.addAndGet(System.currentTimeMillis() - start);
if (size > 0) {
extractedTextSize.addAndGet(result.length());
extractionCount.incrementAndGet();
totalSizeRead.addAndGet(size);
return result;
}
return null;
}
//~--------------------------------------< WorkItem >
private static class WorkItem {
final BinaryResource source;
private WorkItem(BinaryResource source) {
this.source = source;
}
@Override
public String toString() {
return source != null ? source.toString() : "<EMPTY>";
}
}
}