TIKA-3394 -- integrate pipes into tika-app
diff --git a/tika-app/pom.xml b/tika-app/pom.xml
index 57d4f28..33be600 100644
--- a/tika-app/pom.xml
+++ b/tika-app/pom.xml
@@ -64,6 +64,11 @@
<artifactId>tika-batch</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>tika-emitter-fs</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<!-- logging -->
<dependency>
diff --git a/tika-app/src/main/java/org/apache/tika/cli/TikaCLI.java b/tika-app/src/main/java/org/apache/tika/cli/TikaCLI.java
index 1581692..c4befeb 100644
--- a/tika-app/src/main/java/org/apache/tika/cli/TikaCLI.java
+++ b/tika-app/src/main/java/org/apache/tika/cli/TikaCLI.java
@@ -67,6 +67,7 @@
import org.apache.tika.config.TikaConfigSerializer;
import org.apache.tika.detect.CompositeDetector;
import org.apache.tika.detect.Detector;
+import org.apache.tika.exception.TikaConfigException;
import org.apache.tika.exception.TikaException;
import org.apache.tika.extractor.DefaultEmbeddedStreamTranslator;
import org.apache.tika.extractor.EmbeddedDocumentExtractor;
@@ -95,6 +96,11 @@
import org.apache.tika.parser.RecursiveParserWrapper;
import org.apache.tika.parser.digestutils.CommonsDigester;
import org.apache.tika.parser.pdf.PDFParserConfig;
+import org.apache.tika.pipes.FetchEmitTuple;
+import org.apache.tika.pipes.PipesException;
+import org.apache.tika.pipes.async.AsyncProcessor;
+import org.apache.tika.pipes.fetcher.Fetcher;
+import org.apache.tika.pipes.pipesiterator.PipesIterator;
import org.apache.tika.sax.BasicContentHandlerFactory;
import org.apache.tika.sax.BodyContentHandler;
import org.apache.tika.sax.ContentHandlerFactory;
@@ -135,6 +141,9 @@
BatchProcessDriverCLI batchDriver = new BatchProcessDriverCLI(batchArgs);
batchDriver.execute();
return;
+ } else if (cli.testForAsync(args)) {
+ async(args);
+ return;
}
if (args.length > 0) {
@@ -159,6 +168,41 @@
}
}
+ private boolean testForAsync(String[] args) {
+ for (String arg : args) {
+ if (arg.equals("-a") || arg.equals("--async")) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private static void async(String[] args) throws InterruptedException, PipesException,
+ TikaException,
+ IOException,
+ SAXException {
+ String tikaConfigPath = "";
+ for (String arg : args) {
+ if (arg.startsWith("--config=")) {
+ tikaConfigPath = arg.substring(9);
+ }
+ }
+ PipesIterator pipesIterator = PipesIterator.build(Paths.get(tikaConfigPath));
+ try (AsyncProcessor processor = new AsyncProcessor(Paths.get(tikaConfigPath))) {
+ for (FetchEmitTuple t : pipesIterator) {
+ processor.offer(t, 2000);
+ }
+ processor.finished();
+ while (true) {
+ if (processor.checkActive()) {
+ Thread.sleep(500);
+ } else {
+ break;
+ }
+ }
+ }
+ }
+
private void extractInlineImagesFromPDFs() {
if (configFilePath == null && context.get(PDFParserConfig.class) == null) {
PDFParserConfig pdfParserConfig = new PDFParserConfig();
@@ -335,6 +379,8 @@
private DigestingParser.Digester digester = null;
+ private boolean asyncMode = false;
+
private boolean pipeMode = true;
private boolean fork = false;
@@ -395,7 +441,9 @@
// ignore, as container-aware detectors are now always used
} else if (arg.equals("-f") || arg.equals("--fork")) {
fork = true;
- } else if (arg.startsWith("--config=")) {
+ } else if (arg.equals("-a") || arg.equals("--async")) {
+ asyncMode = true;
+ } else if (arg.startsWith("--config=")) {
configFilePath = arg.substring("--config=".length());
} else if (arg.startsWith("--digest=")) {
digester = new CommonsDigester(MAX_MARK,
@@ -553,6 +601,8 @@
out.println(" -J or --jsonRecursive Output metadata and content from all");
out.println(" embedded files (choose content type");
out.println(" with -x, -h, -t or -m; default is -x)");
+ out.println(" -a or --async Run Tika in async mode; must specify details in a" +
+ " tikaConfig file");
out.println(" -l or --language Output only language");
out.println(" -d or --detect Detect document type");
out.println(" --digest=X Include digest X (md2, md5, sha1,");
diff --git a/tika-app/src/test/java/org/apache/tika/cli/TikaCLITest.java b/tika-app/src/test/java/org/apache/tika/cli/TikaCLITest.java
index df0bbc8..1ede66b 100644
--- a/tika-app/src/test/java/org/apache/tika/cli/TikaCLITest.java
+++ b/tika-app/src/test/java/org/apache/tika/cli/TikaCLITest.java
@@ -26,11 +26,15 @@
import java.io.File;
import java.io.PrintStream;
import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Path;
import org.apache.commons.io.FileUtils;
import org.apache.tika.exception.TikaException;
import org.junit.After;
+import org.junit.AfterClass;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
/**
@@ -43,9 +47,55 @@
private ByteArrayOutputStream errContent = null;
private PrintStream stdout = null;
private PrintStream stderr = null;
- private final File testDataFile = new File("src/test/resources/test-data");
+ private static final File testDataFile = new File("src/test/resources/test-data");
private final URI testDataURI = testDataFile.toURI();
private String resourcePrefix;
+ private static Path ASYNC_CONFIG;
+ private static Path ASYNC_OUTPUT_DIR;
+
+ @BeforeClass
+ public static void setUpClass() throws Exception {
+ ASYNC_OUTPUT_DIR = Files.createTempDirectory("tika-cli-async-");
+ ASYNC_CONFIG = Files.createTempFile("async-config-", ".xml");
+ String xml = "<properties>" +
+ "<async>" +
+ "<params>" +
+ "<numClients>3</numClients>" +
+ "<tikaConfig>" + ASYNC_CONFIG.toAbsolutePath() + "</tikaConfig>" +
+ "</params>" +
+ "</async>" +
+ "<fetchers>" +
+ "<fetcher class=\"org.apache.tika.pipes.fetcher.fs.FileSystemFetcher\">" +
+ "<params>" +
+ "<name>fsf</name>" +
+ "<basePath>" + testDataFile.getAbsolutePath() + "</basePath>" +
+ "</params>" +
+ "</fetcher>" +
+ "</fetchers>" +
+ "<emitters>" +
+ "<emitter class=\"org.apache.tika.pipes.emitter.fs.FileSystemEmitter\">" +
+ "<params>" +
+ "<name>fse</name>" +
+ "<basePath>" + ASYNC_OUTPUT_DIR.toAbsolutePath() + "</basePath>" +
+ "</params></emitter>" +
+ "</emitters>" +
+ "<pipesIterator " +
+ "class=\"org.apache.tika.pipes.pipesiterator.FileSystemPipesIterator\">" +
+ "<params>" +
+ "<basePath>" + testDataFile.getAbsolutePath() + "</basePath>" +
+ "<fetcherName>fsf</fetcherName>" +
+ "<emitterName>fse</emitterName>" +
+ "</params>" +
+ "</pipesIterator>" +
+ "</properties>";
+ Files.write(ASYNC_CONFIG, xml.getBytes(UTF_8));
+ }
+
+ @AfterClass
+ public static void tearDownClass() throws Exception {
+ Files.delete(ASYNC_CONFIG);
+ FileUtils.deleteDirectory(ASYNC_OUTPUT_DIR.toFile());
+ }
/**
* reset resourcePrefix
@@ -574,6 +624,21 @@
assertTrue(content.contains("application/vnd.oasis.opendocument.text-web"));
}
+ @Test
+ public void testAsync() throws Exception {
+ String content = getParamOutContent("-a",
+ "--config="+ASYNC_CONFIG.toAbsolutePath());
+
+ int json = 0;
+ for (File f : ASYNC_OUTPUT_DIR.toFile().listFiles()) {
+ if (f.getName().endsWith(".json")) {
+ json++;
+ }
+ }
+ assertEquals(17, json);
+ }
+
+
/**
* reset outContent and errContent if they are not empty
* run given params in TikaCLI and return outContent String with UTF-8
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
index 45a3565..09826b0 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
@@ -193,7 +193,6 @@
logGobblerThread.interrupt();
}
ProcessBuilder pb = new ProcessBuilder(getCommandline());
-
process = pb.start();
logGobbler = new LogGobbler(process.getErrorStream());
logGobblerThread = new Thread(logGobbler);
@@ -251,7 +250,7 @@
commandLine.add(maxForEmitBatchBytes);
commandLine.add(Long.toString(pipesConfig.getTimeoutMillis()));
commandLine.add(Long.toString(pipesConfig.getShutdownClientAfterMillis()));
-
+ LOG.debug("commandline: " + commandLine.toString());
return commandLine.toArray(new String[0]);
}
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
index e939e85..69699d8 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
@@ -38,6 +38,9 @@
try (InputStream is = Files.newInputStream(p)) {
asyncConfig.configure("async", is);
}
+ if (asyncConfig.getTikaConfig() == null) {
+ asyncConfig.setTikaConfig(p);
+ }
return asyncConfig;
}
@@ -74,7 +77,7 @@
* What is the maximum bytes size per extract that
* will be allowed in the emit queue. If an extract is too
* big, skip the emit queue and forward it directly from the processor. If
- * set to <code>0</code>, this will never send a an extract back for batch emitting,
+ * set to <code>0</code>, this will never send an extract back for batch emitting,
* but will emit the extract directly from the processor.
* @return
*/
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
index 5453581..467666b 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
@@ -52,29 +52,29 @@
private final ArrayBlockingQueue<EmitData> emitData;
private final ExecutorCompletionService<Integer> executorCompletionService;
private final ExecutorService executorService;
- private final int fetchEmitTupleQSize = 10000;
- private int numParserThreads = 10;
- private int numEmitterThreads = 2;
+ private final AsyncConfig asyncConfig;
private int numParserThreadsFinished = 0;
private boolean addedEmitterSemaphores = false;
private int finished = 0;
boolean isShuttingDown = false;
public AsyncProcessor(Path tikaConfigPath) throws TikaException, IOException, SAXException {
- this.fetchEmitTuples = new ArrayBlockingQueue<>(fetchEmitTupleQSize);
+ this.asyncConfig = AsyncConfig.load(tikaConfigPath);
+ this.fetchEmitTuples = new ArrayBlockingQueue<>(asyncConfig.getQueueSize());
this.emitData = new ArrayBlockingQueue<>(100);
- this.executorService = Executors.newFixedThreadPool(numParserThreads + numEmitterThreads);
+ this.executorService = Executors.newFixedThreadPool(
+ asyncConfig.getNumClients() + asyncConfig.getNumEmitters());
this.executorCompletionService =
new ExecutorCompletionService<>(executorService);
- AsyncConfig asyncConfig = AsyncConfig.load(tikaConfigPath);
- for (int i = 0; i < numParserThreads; i++) {
+
+ for (int i = 0; i < asyncConfig.getNumClients(); i++) {
executorCompletionService.submit(new FetchEmitWorker(asyncConfig, fetchEmitTuples,
emitData));
}
EmitterManager emitterManager = EmitterManager.load(tikaConfigPath);
- for (int i = 0; i < numEmitterThreads; i++) {
+ for (int i = 0; i < asyncConfig.getNumEmitters(); i++) {
executorCompletionService.submit(new AsyncEmitter(emitData, emitterManager));
}
}
@@ -85,9 +85,9 @@
throw new IllegalStateException(
"Can't call offer after calling close() or " + "shutdownNow()");
}
- if (newFetchEmitTuples.size() > fetchEmitTupleQSize) {
+ if (newFetchEmitTuples.size() > asyncConfig.getQueueSize()) {
throw new OfferLargerThanQueueSize(newFetchEmitTuples.size(),
- fetchEmitTupleQSize);
+ asyncConfig.getQueueSize());
}
long start = System.currentTimeMillis();
long elapsed = System.currentTimeMillis() - start;
@@ -124,6 +124,13 @@
return fetchEmitTuples.offer(t, offerMs, TimeUnit.MILLISECONDS);
}
+ public void finished() throws InterruptedException {
+ for (int i = 0; i < asyncConfig.getNumClients(); i++) {
+ //can hang forever
+ fetchEmitTuples.offer(PipesIterator.COMPLETED_SEMAPHORE);
+ }
+ }
+
public boolean checkActive() {
Future<Integer> future = executorCompletionService.poll();
@@ -139,13 +146,14 @@
}
finished++;
}
- if (numParserThreadsFinished == numParserThreads && ! addedEmitterSemaphores) {
- for (int i = 0; i < numEmitterThreads; i++) {
+ if (numParserThreadsFinished == asyncConfig.getNumClients() && ! addedEmitterSemaphores) {
+ for (int i = 0; i < asyncConfig.getNumEmitters(); i++) {
+ //can hang forever
emitData.offer(AsyncEmitter.EMIT_DATA_STOP_SEMAPHORE);
}
addedEmitterSemaphores = true;
}
- return finished != (numEmitterThreads + numParserThreads);
+ return finished != (asyncConfig.getNumClients() + asyncConfig.getNumEmitters());
}
@Override
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/FileSystemFetcher.java b/tika-core/src/main/java/org/apache/tika/pipes/fetcher/fs/FileSystemFetcher.java
similarity index 97%
rename from tika-core/src/main/java/org/apache/tika/pipes/fetcher/FileSystemFetcher.java
rename to tika-core/src/main/java/org/apache/tika/pipes/fetcher/fs/FileSystemFetcher.java
index e4f50d2..5c22e7f 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/FileSystemFetcher.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/fetcher/fs/FileSystemFetcher.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.tika.pipes.fetcher;
+package org.apache.tika.pipes.fetcher.fs;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -33,6 +33,7 @@
import org.apache.tika.io.TikaInputStream;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.metadata.TikaCoreProperties;
+import org.apache.tika.pipes.fetcher.AbstractFetcher;
public class FileSystemFetcher extends AbstractFetcher implements Initializable {
diff --git a/tika-core/src/test/java/org/apache/tika/config/TikaPipesConfigTest.java b/tika-core/src/test/java/org/apache/tika/config/TikaPipesConfigTest.java
index 7777278..b27a82f 100644
--- a/tika-core/src/test/java/org/apache/tika/config/TikaPipesConfigTest.java
+++ b/tika-core/src/test/java/org/apache/tika/config/TikaPipesConfigTest.java
@@ -28,7 +28,7 @@
import org.apache.tika.pipes.emitter.EmitterManager;
import org.apache.tika.pipes.fetcher.Fetcher;
import org.apache.tika.pipes.fetcher.FetcherManager;
-import org.apache.tika.pipes.fetcher.FileSystemFetcher;
+import org.apache.tika.pipes.fetcher.fs.FileSystemFetcher;
import org.apache.tika.pipes.pipesiterator.PipesIterator;
public class TikaPipesConfigTest extends AbstractTikaConfigTest {
diff --git a/tika-core/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java b/tika-core/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
index 3040d3a..4dd8a79 100644
--- a/tika-core/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
+++ b/tika-core/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
@@ -62,7 +62,7 @@
" <emitter class=\"org.apache.tika.pipes.async.MockEmitter\">\n" +
" <params>\n" + " <name>mock</name>\n" + " </params>" + " </emitter>" +
" </emitters>" + " <fetchers>" +
- " <fetcher class=\"org.apache.tika.pipes.fetcher.FileSystemFetcher\">" +
+ " <fetcher class=\"org.apache.tika.pipes.fetcher.fs.FileSystemFetcher\">" +
" <params><name>mock</name>\n" + " <basePath>" +
ProcessUtils.escapeCommandLine(inputDir.toAbsolutePath().toString()) +
"</basePath></params>\n" + " </fetcher>" + " </fetchers>" +
diff --git a/tika-core/src/test/java/org/apache/tika/pipes/fetcher/FileSystemFetcherTest.java b/tika-core/src/test/java/org/apache/tika/pipes/fetcher/fs/FileSystemFetcherTest.java
similarity index 97%
rename from tika-core/src/test/java/org/apache/tika/pipes/fetcher/FileSystemFetcherTest.java
rename to tika-core/src/test/java/org/apache/tika/pipes/fetcher/fs/FileSystemFetcherTest.java
index 4e03117..36dfdaf 100644
--- a/tika-core/src/test/java/org/apache/tika/pipes/fetcher/FileSystemFetcherTest.java
+++ b/tika-core/src/test/java/org/apache/tika/pipes/fetcher/fs/FileSystemFetcherTest.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.tika.pipes.fetcher;
+package org.apache.tika.pipes.fetcher.fs;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
diff --git a/tika-core/src/test/resources/org/apache/tika/config/fetchers-config.xml b/tika-core/src/test/resources/org/apache/tika/config/fetchers-config.xml
index cd53dd9..6858eef 100644
--- a/tika-core/src/test/resources/org/apache/tika/config/fetchers-config.xml
+++ b/tika-core/src/test/resources/org/apache/tika/config/fetchers-config.xml
@@ -17,13 +17,13 @@
-->
<properties>
<fetchers>
- <fetcher class="org.apache.tika.pipes.fetcher.FileSystemFetcher">
+ <fetcher class="org.apache.tika.pipes.fetcher.fs.FileSystemFetcher">
<params>
<name>fs1</name>
<basePath>/my/base/path1</basePath>
</params>
</fetcher>
- <fetcher class="org.apache.tika.pipes.fetcher.FileSystemFetcher">
+ <fetcher class="org.apache.tika.pipes.fetcher.fs.FileSystemFetcher">
<params>
<name>fs2</name>
<basePath>/my/base/path2</basePath>
diff --git a/tika-core/src/test/resources/org/apache/tika/config/fetchers-duplicate-config.xml b/tika-core/src/test/resources/org/apache/tika/config/fetchers-duplicate-config.xml
index 0d4a2c0..64d785f 100644
--- a/tika-core/src/test/resources/org/apache/tika/config/fetchers-duplicate-config.xml
+++ b/tika-core/src/test/resources/org/apache/tika/config/fetchers-duplicate-config.xml
@@ -17,13 +17,13 @@
-->
<properties>
<fetchers>
- <fetcher class="org.apache.tika.pipes.fetcher.FileSystemFetcher">
+ <fetcher class="org.apache.tika.pipes.fetcher.fs.FileSystemFetcher">
<params>
<name>fs1</name>
<basePath>/my/base/path1</basePath>
</params>
</fetcher>
- <fetcher class="org.apache.tika.pipes.fetcher.FileSystemFetcher">
+ <fetcher class="org.apache.tika.pipes.fetcher.fs.FileSystemFetcher">
<params>
<name>fs1</name>
<basePath>/my/base/path2</basePath>
diff --git a/tika-core/src/test/resources/org/apache/tika/config/fetchers-nobasepath-config.xml b/tika-core/src/test/resources/org/apache/tika/config/fetchers-nobasepath-config.xml
index 2396ccc..0cd79e4 100644
--- a/tika-core/src/test/resources/org/apache/tika/config/fetchers-nobasepath-config.xml
+++ b/tika-core/src/test/resources/org/apache/tika/config/fetchers-nobasepath-config.xml
@@ -17,13 +17,13 @@
-->
<properties>
<fetchers>
- <fetcher class="org.apache.tika.pipes.fetcher.FileSystemFetcher">
+ <fetcher class="org.apache.tika.pipes.fetcher.fs.FileSystemFetcher">
<params>
<name>fs1</name>
<basePath>/my/base/path1</basePath>
</params>
</fetcher>
- <fetcher class="org.apache.tika.pipes.fetcher.FileSystemFetcher">
+ <fetcher class="org.apache.tika.pipes.fetcher.fs.FileSystemFetcher">
<params>
<name>fs2</name>
</params>
diff --git a/tika-core/src/test/resources/org/apache/tika/config/fetchers-noname-config.xml b/tika-core/src/test/resources/org/apache/tika/config/fetchers-noname-config.xml
index cd17e42..d07aacc 100644
--- a/tika-core/src/test/resources/org/apache/tika/config/fetchers-noname-config.xml
+++ b/tika-core/src/test/resources/org/apache/tika/config/fetchers-noname-config.xml
@@ -17,13 +17,13 @@
-->
<properties>
<fetchers>
- <fetcher class="org.apache.tika.pipes.fetcher.FileSystemFetcher">
+ <fetcher class="org.apache.tika.pipes.fetcher.fs.FileSystemFetcher">
<params>
<name>fs1</name>
<basePath>/my/base/path1</basePath>
</params>
</fetcher>
- <fetcher class="org.apache.tika.pipes.fetcher.FileSystemFetcher">
+ <fetcher class="org.apache.tika.pipes.fetcher.fs.FileSystemFetcher">
<params>
<basePath>/my/base/path2</basePath>
</params>
diff --git a/tika-server/tika-server-client/src/test/resources/tika-config-simple-fs-emitter.xml b/tika-server/tika-server-client/src/test/resources/tika-config-simple-fs-emitter.xml
index 62e692f..012d0c6 100644
--- a/tika-server/tika-server-client/src/test/resources/tika-config-simple-fs-emitter.xml
+++ b/tika-server/tika-server-client/src/test/resources/tika-config-simple-fs-emitter.xml
@@ -28,7 +28,7 @@
</pipesIterator>
</pipesIterators>
<fetchers>
- <fetcher class="org.apache.tika.pipes.fetcher.FileSystemFetcher">
+ <fetcher class="org.apache.tika.pipes.fetcher.fs.FileSystemFetcher">
<params>
<name>fs</name>
<basePath>fix</basePath>
diff --git a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaPipesTest.java b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaPipesTest.java
index e82ff55..f6d37de 100644
--- a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaPipesTest.java
+++ b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaPipesTest.java
@@ -102,7 +102,7 @@
TIKA_CONFIG_XML =
"<?xml version=\"1.0\" encoding=\"UTF-8\"?>" + "<properties>" + "<fetchers>" +
- "<fetcher class=\"org.apache.tika.pipes.fetcher.FileSystemFetcher\">" +
+ "<fetcher class=\"org.apache.tika.pipes.fetcher.fs.FileSystemFetcher\">" +
"<params>" + "<name>fsf</name>" +
"<basePath>" + inputDir.toAbsolutePath() +
"</basePath>" + "</params>" + "</fetcher>" + "</fetchers>" + "<emitters>" +
diff --git a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerAsyncIntegrationTest.java b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerAsyncIntegrationTest.java
index 2724c70..841d8b3 100644
--- a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerAsyncIntegrationTest.java
+++ b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerAsyncIntegrationTest.java
@@ -100,7 +100,7 @@
TIKA_CONFIG_XML =
"<?xml version=\"1.0\" encoding=\"UTF-8\"?>" + "<properties>" + "<fetchers>" +
- "<fetcher class=\"org.apache.tika.pipes.fetcher.FileSystemFetcher\">" +
+ "<fetcher class=\"org.apache.tika.pipes.fetcher.fs.FileSystemFetcher\">" +
"<params>" + "<name>" + FETCHER_NAME +
"</name>" + "<basePath>" +
inputDir.toAbsolutePath() + "</basePath>" + "</params>" + "</fetcher>" +
diff --git a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerPipesIntegrationTest.java b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerPipesIntegrationTest.java
index 5ff0f11..8aba852 100644
--- a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerPipesIntegrationTest.java
+++ b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerPipesIntegrationTest.java
@@ -83,7 +83,7 @@
TIKA_CONFIG_TIMEOUT = TMP_DIR.resolve("tika-config-timeout.xml");
//TODO -- clean this up so that port is sufficient and we don't need portString
String xml1 = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" + "<properties>" + "<fetchers>" +
- "<fetcher class=\"org.apache.tika.pipes.fetcher.FileSystemFetcher\">" + "<params>" +
+ "<fetcher class=\"org.apache.tika.pipes.fetcher.fs.FileSystemFetcher\">" + "<params>" +
"<name>" + FETCHER_NAME + "</name>" +
"<basePath>" + inputDir.toAbsolutePath() +
"</basePath>" + "</params>" + "</fetcher>" + "</fetchers>" + "<emitters>" +