blob: 54e32ce3fbdd0f49f0f002c747b93592e1e38bdc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tika;
import static org.junit.Assert.assertEquals;
import java.io.FileFilter;
import java.io.IOException;
import java.io.InputStream;
import java.net.URISyntaxException;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.tika.detect.Detector;
import org.apache.tika.detect.XmlRootExtractor;
import org.apache.tika.exception.TikaException;
import org.apache.tika.io.TikaInputStream;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.metadata.TikaCoreProperties;
import org.apache.tika.mime.MediaType;
import org.apache.tika.parser.ParseContext;
import org.apache.tika.parser.Parser;
import org.apache.tika.sax.BasicContentHandlerFactory;
import org.apache.tika.sax.RecursiveParserWrapperHandler;
import org.apache.tika.utils.XMLReaderUtils;
public class MultiThreadedTikaTest extends TikaTest {
//TODO: figure out how to make failures reproducible a la Lucene/Solr with a seed
//TODO: Consider randomizing the Locale and timezone, like Lucene/Solr...
XmlRootExtractor ex = new XmlRootExtractor();
public static Path[] getTestFiles(final FileFilter fileFilter)
throws URISyntaxException, IOException {
Path root = Paths.get(MultiThreadedTikaTest.class.getResource("/test-documents").toURI());
final List<Path> files = new ArrayList<>();
Files.walkFileTree(root, new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs)
throws IOException {
if (fileFilter != null && !fileFilter.accept(file.toFile())) {
return FileVisitResult.CONTINUE;
}
if (!attrs.isDirectory()) {
files.add(file);
}
return FileVisitResult.CONTINUE;
}
});
return files.toArray(new Path[0]);
}
private static ConcurrentHashMap<Path, MediaType> getBaselineDetection(Detector detector,
Path[] files) {
ConcurrentHashMap<Path, MediaType> baseline = new ConcurrentHashMap<>();
XmlRootExtractor extractor = new XmlRootExtractor();
for (Path f : files) {
Metadata metadata = new Metadata();
try (TikaInputStream tis = TikaInputStream.get(f, metadata)) {
baseline.put(f, detector.detect(tis, metadata));
baseline.put(f, detector.detect(tis, metadata));
} catch (IOException e) {
e.printStackTrace();
}
}
return baseline;
}
private static ConcurrentHashMap<Path, Extract> getBaseline(Parser parser, Path[] files,
ParseContext parseContext) {
ConcurrentHashMap<Path, Extract> baseline = new ConcurrentHashMap<>();
for (Path f : files) {
try (TikaInputStream is = TikaInputStream.get(f)) {
List<Metadata> metadataList = getRecursiveMetadata(is, parser, parseContext);
baseline.put(f, new Extract(metadataList));
} catch (Exception e) {
e.printStackTrace();
//swallow
}
}
return baseline;
}
private static List<Metadata> getRecursiveMetadata(InputStream is, Parser parser,
ParseContext parseContext) throws Exception {
//different from parent TikaTest in that this extracts text.
//can't extract xhtml because "tmp" file names wind up in
//content's metadata and they'll differ by file.
parseContext = new ParseContext();
RecursiveParserWrapperHandler handler = new RecursiveParserWrapperHandler(
new BasicContentHandlerFactory(BasicContentHandlerFactory.HANDLER_TYPE.TEXT, -1),
-1);
parser.parse(is, handler, new Metadata(), parseContext);
return handler.getMetadataList();
}
private static void assertExtractEquals(Extract extractA, Extract extractB) {
//this currently only checks the basics
//might want to add more checks
assertEquals("number of embedded files", extractA.metadataList.size(),
extractB.metadataList.size());
for (int i = 0; i < extractA.metadataList.size(); i++) {
assertEquals("number of metadata elements in attachment: " + i,
extractA.metadataList.get(i).size(), extractB.metadataList.get(i).size());
assertEquals("content in attachment: " + i,
extractA.metadataList.get(i).get(TikaCoreProperties.TIKA_CONTENT),
extractB.metadataList.get(i).get(TikaCoreProperties.TIKA_CONTENT));
}
}
/**
* This calls {@link #testEach(Parser parser, Path[], ParseContext[], int, int)} and
* then {@link #testAll(Parser parser, Path[], ParseContext[], int, int)}
*
* @param numThreads number of threads to use
* @param numIterations number of iterations per thread
* @param filter file filter to select files from "/test-documents"; if
* <code>null</code>,
* all files will be used
* @throws Exception
*/
protected void testMultiThreaded(Parser parser, ParseContext[] parseContext, int numThreads,
int numIterations, FileFilter filter) throws Exception {
Path[] allFiles = getTestFiles(filter);
testEach(parser, allFiles, parseContext, numThreads, numIterations);
testAll(parser, allFiles, parseContext, numThreads, numIterations);
}
public void testDetector(Detector detector, int numThreads, int numIterations,
FileFilter filter, int randomlyResizeSAXPool) throws Exception {
Path[] files = getTestFiles(filter);
testDetectorEach(detector, files, numThreads, numIterations, randomlyResizeSAXPool);
testDetectorOnAll(detector, files, numThreads, numIterations, randomlyResizeSAXPool);
}
void testDetectorEach(Detector detector, Path[] files, int numThreads, int numIterations,
int randomlyResizeSAXPool) {
for (Path p : files) {
Path[] toTest = new Path[1];
toTest[0] = p;
testDetectorOnAll(detector, toTest, numThreads, numIterations, randomlyResizeSAXPool);
}
}
private void testDetectorOnAll(Detector detector, Path[] toTest, int numThreads,
int numIterations, int randomlyResizeSAXPool) {
Map<Path, MediaType> truth = getBaselineDetection(detector, toTest);
//if all files caused an exception
if (truth.size() == 0) {
return;
}
//only those that parsed without exception
Path[] testFiles = new Path[truth.size()];
int j = 0;
for (Path testFile : truth.keySet()) {
testFiles[j++] = testFile;
}
int actualThreadCount = numThreads + Math.max(randomlyResizeSAXPool, 0);
ExecutorService ex = Executors.newFixedThreadPool(actualThreadCount);
try {
_testDetectorOnAll(detector, testFiles, numThreads, numIterations, truth, ex,
randomlyResizeSAXPool);
} finally {
ex.shutdown();
ex.shutdownNow();
}
}
private void _testDetectorOnAll(Detector detector, Path[] testFiles, int numThreads,
int numIterations, Map<Path, MediaType> truth,
ExecutorService ex, int randomlyResizeSAXPool) {
ExecutorCompletionService<Integer> executorCompletionService =
new ExecutorCompletionService<>(ex);
executorCompletionService.submit(new SAXPoolResizer(randomlyResizeSAXPool));
for (int i = 0; i < numThreads; i++) {
executorCompletionService
.submit(new TikaDetectorRunner(detector, numIterations, testFiles, truth));
}
int completed = 0;
while (completed < numThreads) {
//TODO: add a maximum timeout threshold
Future<Integer> future = null;
try {
future = executorCompletionService.poll(1000, TimeUnit.MILLISECONDS);
if (future != null) {
future.get();//trigger exceptions from thread
completed++;
}
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
ex.shutdown();
ex.shutdownNow();
}
/**
* Test each file, one at a time in multiple threads.
* This was required to test TIKA-2519 in a reasonable
* amount of time. This forced the parser to use the
* same underlying memory structures because it was the same file.
* This is stricter than I think our agreement with clients is
* because this run tests on literally the same file and
* not a copy of the file per thread. Let's leave this as is
* unless there's a good reason to create a separate copy per thread.
*
* @param files files to test, one at a time
* @param numThreads number of threads to use
* @param numIterations number of iterations per thread
*/
protected void testEach(Parser parser, Path[] files, ParseContext[] parseContext,
int numThreads, int numIterations) {
for (Path p : files) {
Path[] toTest = new Path[1];
toTest[0] = p;
testAll(parser, toTest, parseContext, numThreads, numIterations);
}
}
/**
* This tests all files together. Each parser randomly selects
* a file from the array. Two parsers could wind up parsing the
* same file at the same time. Good.
* <p>
* In the current implementation, this gets ground truth only
* from files that do not throw exceptions. This will ignore
* files that cause exceptions.
*
* @param files files to parse
* @param numThreads number of parser threads
* @param numIterations number of iterations per parser
*/
protected void testAll(Parser parser, Path[] files, ParseContext[] parseContext, int numThreads,
int numIterations) {
Map<Path, Extract> truth = getBaseline(parser, files, parseContext[0]);
//if all files caused an exception
if (truth.size() == 0) {
//return;
}
//only those that parsed without exception
Path[] testFiles = new Path[truth.size()];
int j = 0;
for (Path testFile : truth.keySet()) {
testFiles[j++] = testFile;
}
ExecutorService ex = Executors.newFixedThreadPool(numThreads);
try {
_testAll(parser, files, parseContext, numThreads, numIterations, truth, ex);
} finally {
ex.shutdown();
ex.shutdownNow();
}
}
private void _testAll(Parser parser, Path[] testFiles, ParseContext[] parseContext,
int numThreads, int numIterations, Map<Path, Extract> truth,
ExecutorService ex) {
ExecutorCompletionService<Integer> executorCompletionService =
new ExecutorCompletionService<>(ex);
//use the same parser in all threads
for (int i = 0; i < numThreads; i++) {
executorCompletionService
.submit(new TikaRunner(parser, parseContext[i], numIterations, testFiles,
truth));
}
int completed = 0;
while (completed < numThreads) {
//TODO: add a maximum timeout threshold
Future<Integer> future = null;
try {
future = executorCompletionService.poll(1000, TimeUnit.MILLISECONDS);
if (future != null) {
future.get();//trigger exceptions from thread
completed++;
}
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
}
//TODO: make this return something useful besides an integer
private static class TikaRunner implements Callable<Integer> {
private static final AtomicInteger threadCount = new AtomicInteger(0);
private final Parser parser;
private final int iterations;
private final Path[] files;
private final Map<Path, Extract> truth;
private final ParseContext parseContext;
private final Random random = new Random();
private final int threadNumber;
private TikaRunner(Parser parser, ParseContext parseContext, int iterations, Path[] files,
Map<Path, Extract> truth) {
this.parser = parser;
this.iterations = iterations;
this.files = files;
this.truth = truth;
this.parseContext = parseContext;
threadNumber = threadCount.getAndIncrement();
}
@Override
public Integer call() throws Exception {
for (int i = 0; i < iterations; i++) {
int randIndex = random.nextInt(files.length);
Path testFile = files[randIndex];
List<Metadata> metadataList = null;
boolean success = false;
try (InputStream is = Files.newInputStream(testFile)) {
metadataList = getRecursiveMetadata(is, parser, new ParseContext());
success = true;
} catch (Exception e) {
//swallow
//throw new RuntimeException(testFile + " triggered this exception", e);
}
if (success) {
assertExtractEquals(truth.get(testFile), new Extract(metadataList));
}
}
return 1;
}
}
private static class Extract {
final List<Metadata> metadataList;
private Extract(List<Metadata> metadataList) {
this.metadataList = metadataList;
}
}
private static class SAXPoolResizer implements Callable<Integer> {
private final int maxResize;
private final Random rand = new Random();
SAXPoolResizer(int maxResize) {
this.maxResize = maxResize;
}
public Integer call() throws TikaException {
int resized = 0;
while (true) {
try {
Thread.yield();
Thread.sleep(500);
} catch (InterruptedException e) {
return resized;
}
if (maxResize > 0 && rand.nextFloat() > 0.01) {
int sz = rand.nextInt(maxResize) + 1;
XMLReaderUtils.setPoolSize(sz);
resized++;
}
}
}
}
private static class TikaDetectorRunner implements Callable<Integer> {
private final Detector detector;
private final int iterations;
private final Path[] files;
private final Map<Path, MediaType> truth;
private final Random random = new Random();
private TikaDetectorRunner(Detector detector, int iterations, Path[] files,
Map<Path, MediaType> truth) {
this.detector = detector;
this.iterations = iterations;
this.files = files;
this.truth = truth;
}
@Override
public Integer call() throws Exception {
for (int i = 0; i < iterations; i++) {
int randIndex = random.nextInt(files.length);
Path testFile = files[randIndex];
Metadata metadata = new Metadata();
try (TikaInputStream tis = TikaInputStream.get(testFile, metadata)) {
MediaType mediaType = detector.detect(tis, metadata);
assertEquals("failed on: " + testFile.getFileName(), truth.get(testFile),
mediaType);
}
}
return 1;
}
}
}