TIKA-3394 -- integrate pipes into tika-app
diff --git a/tika-app/pom.xml b/tika-app/pom.xml
index 57d4f28..33be600 100644
--- a/tika-app/pom.xml
+++ b/tika-app/pom.xml
@@ -64,6 +64,11 @@
       <artifactId>tika-batch</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>tika-emitter-fs</artifactId>
+      <version>${project.version}</version>
+    </dependency>
 
     <!-- logging -->
     <dependency>
diff --git a/tika-app/src/main/java/org/apache/tika/cli/TikaCLI.java b/tika-app/src/main/java/org/apache/tika/cli/TikaCLI.java
index 1581692..c4befeb 100644
--- a/tika-app/src/main/java/org/apache/tika/cli/TikaCLI.java
+++ b/tika-app/src/main/java/org/apache/tika/cli/TikaCLI.java
@@ -67,6 +67,7 @@
 import org.apache.tika.config.TikaConfigSerializer;
 import org.apache.tika.detect.CompositeDetector;
 import org.apache.tika.detect.Detector;
+import org.apache.tika.exception.TikaConfigException;
 import org.apache.tika.exception.TikaException;
 import org.apache.tika.extractor.DefaultEmbeddedStreamTranslator;
 import org.apache.tika.extractor.EmbeddedDocumentExtractor;
@@ -95,6 +96,11 @@
 import org.apache.tika.parser.RecursiveParserWrapper;
 import org.apache.tika.parser.digestutils.CommonsDigester;
 import org.apache.tika.parser.pdf.PDFParserConfig;
+import org.apache.tika.pipes.FetchEmitTuple;
+import org.apache.tika.pipes.PipesException;
+import org.apache.tika.pipes.async.AsyncProcessor;
+import org.apache.tika.pipes.fetcher.Fetcher;
+import org.apache.tika.pipes.pipesiterator.PipesIterator;
 import org.apache.tika.sax.BasicContentHandlerFactory;
 import org.apache.tika.sax.BodyContentHandler;
 import org.apache.tika.sax.ContentHandlerFactory;
@@ -135,6 +141,9 @@
             BatchProcessDriverCLI batchDriver = new BatchProcessDriverCLI(batchArgs);
             batchDriver.execute();
             return;
+        } else if (cli.testForAsync(args)) {
+            async(args);
+            return;
         }
 
         if (args.length > 0) {
@@ -159,6 +168,41 @@
         }
     }
 
+    private boolean testForAsync(String[] args) {
+        for (String arg : args) {
+            if (arg.equals("-a") || arg.equals("--async")) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private static void async(String[] args) throws InterruptedException, PipesException,
+            TikaException,
+            IOException,
+            SAXException {
+        String tikaConfigPath = "";
+        for (String arg : args) {
+            if (arg.startsWith("--config=")) {
+                tikaConfigPath = arg.substring(9);
+            }
+        }
+        PipesIterator pipesIterator = PipesIterator.build(Paths.get(tikaConfigPath));
+        try (AsyncProcessor processor = new AsyncProcessor(Paths.get(tikaConfigPath))) {
+            for (FetchEmitTuple t : pipesIterator) {
+                processor.offer(t, 2000);
+            }
+            processor.finished();
+            while (true) {
+                if (processor.checkActive()) {
+                    Thread.sleep(500);
+                } else {
+                    break;
+                }
+            }
+        }
+    }
+
     private void extractInlineImagesFromPDFs() {
         if (configFilePath == null && context.get(PDFParserConfig.class) == null) {
             PDFParserConfig pdfParserConfig = new PDFParserConfig();
@@ -335,6 +379,8 @@
 
     private DigestingParser.Digester digester = null;
 
+    private boolean asyncMode = false;
+
     private boolean pipeMode = true;
 
     private boolean fork = false;
@@ -395,7 +441,9 @@
             // ignore, as container-aware detectors are now always used
         } else if (arg.equals("-f") || arg.equals("--fork")) {
             fork = true;
-        } else if (arg.startsWith("--config=")) {
+        } else if (arg.equals("-a") || arg.equals("--async")) {
+            asyncMode = true;
+        } else  if (arg.startsWith("--config=")) {
             configFilePath = arg.substring("--config=".length());
         } else if (arg.startsWith("--digest=")) {
             digester = new CommonsDigester(MAX_MARK,
@@ -553,6 +601,8 @@
         out.println("    -J  or --jsonRecursive Output metadata and content from all");
         out.println("                           embedded files (choose content type");
         out.println("                           with -x, -h, -t or -m; default is -x)");
+        out.println("    -a  or --async         Run Tika in async mode; must specify details in a" +
+                " tikaConfig file");
         out.println("    -l  or --language      Output only language");
         out.println("    -d  or --detect        Detect document type");
         out.println("           --digest=X      Include digest X (md2, md5, sha1,");
diff --git a/tika-app/src/test/java/org/apache/tika/cli/TikaCLITest.java b/tika-app/src/test/java/org/apache/tika/cli/TikaCLITest.java
index df0bbc8..1ede66b 100644
--- a/tika-app/src/test/java/org/apache/tika/cli/TikaCLITest.java
+++ b/tika-app/src/test/java/org/apache/tika/cli/TikaCLITest.java
@@ -26,11 +26,15 @@
 import java.io.File;
 import java.io.PrintStream;
 import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Path;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.tika.exception.TikaException;
 import org.junit.After;
+import org.junit.AfterClass;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 /**
@@ -43,9 +47,55 @@
     private ByteArrayOutputStream errContent = null;
     private PrintStream stdout = null;
     private PrintStream stderr = null;
-    private final File testDataFile = new File("src/test/resources/test-data");
+    private static final File testDataFile = new File("src/test/resources/test-data");
     private final URI testDataURI = testDataFile.toURI();
     private String resourcePrefix;
+    private static Path ASYNC_CONFIG;
+    private static Path ASYNC_OUTPUT_DIR;
+
+    @BeforeClass
+    public static void setUpClass() throws Exception {
+        ASYNC_OUTPUT_DIR = Files.createTempDirectory("tika-cli-async-");
+        ASYNC_CONFIG = Files.createTempFile("async-config-", ".xml");
+        String xml = "<properties>" +
+                "<async>" +
+                "<params>" +
+                "<numClients>3</numClients>" +
+                "<tikaConfig>" + ASYNC_CONFIG.toAbsolutePath() + "</tikaConfig>" +
+                "</params>" +
+                "</async>" +
+                "<fetchers>" +
+                "<fetcher class=\"org.apache.tika.pipes.fetcher.fs.FileSystemFetcher\">" +
+                "<params>" +
+                "<name>fsf</name>" +
+                "<basePath>" + testDataFile.getAbsolutePath() + "</basePath>" +
+                "</params>" +
+                "</fetcher>" +
+                "</fetchers>" +
+                "<emitters>" +
+                "<emitter class=\"org.apache.tika.pipes.emitter.fs.FileSystemEmitter\">" +
+                "<params>" +
+                "<name>fse</name>" +
+                "<basePath>" + ASYNC_OUTPUT_DIR.toAbsolutePath() + "</basePath>" +
+                "</params></emitter>" +
+                "</emitters>" +
+                "<pipesIterator " +
+                "class=\"org.apache.tika.pipes.pipesiterator.FileSystemPipesIterator\">" +
+                "<params>" +
+                "<basePath>" + testDataFile.getAbsolutePath() + "</basePath>" +
+                "<fetcherName>fsf</fetcherName>" +
+                "<emitterName>fse</emitterName>" +
+                "</params>" +
+                "</pipesIterator>" +
+                "</properties>";
+        Files.write(ASYNC_CONFIG, xml.getBytes(UTF_8));
+    }
+
+    @AfterClass
+    public static void tearDownClass() throws Exception {
+        Files.delete(ASYNC_CONFIG);
+        FileUtils.deleteDirectory(ASYNC_OUTPUT_DIR.toFile());
+    }
 
     /**
      * reset resourcePrefix
@@ -574,6 +624,21 @@
         assertTrue(content.contains("application/vnd.oasis.opendocument.text-web"));
     }
 
+    @Test
+    public void testAsync() throws Exception {
+        String content = getParamOutContent("-a",
+                "--config="+ASYNC_CONFIG.toAbsolutePath());
+
+        int json = 0;
+        for (File f : ASYNC_OUTPUT_DIR.toFile().listFiles()) {
+            if (f.getName().endsWith(".json")) {
+                json++;
+            }
+        }
+        assertEquals(17, json);
+    }
+
+
     /**
      * reset outContent and errContent if they are not empty
      * run given params in TikaCLI and return outContent String with UTF-8
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
index 45a3565..09826b0 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
@@ -193,7 +193,6 @@
             logGobblerThread.interrupt();
         }
         ProcessBuilder pb = new ProcessBuilder(getCommandline());
-
         process = pb.start();
         logGobbler = new LogGobbler(process.getErrorStream());
         logGobblerThread = new Thread(logGobbler);
@@ -251,7 +250,7 @@
         commandLine.add(maxForEmitBatchBytes);
         commandLine.add(Long.toString(pipesConfig.getTimeoutMillis()));
         commandLine.add(Long.toString(pipesConfig.getShutdownClientAfterMillis()));
-
+        LOG.debug("commandline: " + commandLine.toString());
         return commandLine.toArray(new String[0]);
     }
 
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
index e939e85..69699d8 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
@@ -38,6 +38,9 @@
         try (InputStream is = Files.newInputStream(p)) {
             asyncConfig.configure("async", is);
         }
+        if (asyncConfig.getTikaConfig() == null) {
+            asyncConfig.setTikaConfig(p);
+        }
         return asyncConfig;
     }
 
@@ -74,7 +77,7 @@
      *  What is the maximum bytes size per extract that
      *  will be allowed in the emit queue.  If an extract is too
      *  big, skip the emit queue and forward it directly from the processor.  If
-     *  set to <code>0</code>, this will never send a an extract back for batch emitting,
+     *  set to <code>0</code>, this will never send an extract back for batch emitting,
      *  but will emit the extract directly from the processor.
      * @return
      */
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
index 5453581..467666b 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
@@ -52,29 +52,29 @@
     private final ArrayBlockingQueue<EmitData> emitData;
     private final ExecutorCompletionService<Integer> executorCompletionService;
     private final ExecutorService executorService;
-    private final int fetchEmitTupleQSize = 10000;
-    private int numParserThreads = 10;
-    private int numEmitterThreads = 2;
+    private final AsyncConfig asyncConfig;
     private int numParserThreadsFinished = 0;
     private boolean addedEmitterSemaphores = false;
     private int finished = 0;
     boolean isShuttingDown = false;
 
     public AsyncProcessor(Path tikaConfigPath) throws TikaException, IOException, SAXException {
-        this.fetchEmitTuples = new ArrayBlockingQueue<>(fetchEmitTupleQSize);
+        this.asyncConfig = AsyncConfig.load(tikaConfigPath);
+        this.fetchEmitTuples = new ArrayBlockingQueue<>(asyncConfig.getQueueSize());
         this.emitData = new ArrayBlockingQueue<>(100);
-        this.executorService = Executors.newFixedThreadPool(numParserThreads + numEmitterThreads);
+        this.executorService = Executors.newFixedThreadPool(
+                asyncConfig.getNumClients() + asyncConfig.getNumEmitters());
         this.executorCompletionService =
                 new ExecutorCompletionService<>(executorService);
 
-        AsyncConfig asyncConfig = AsyncConfig.load(tikaConfigPath);
-        for (int i = 0; i < numParserThreads; i++) {
+
+        for (int i = 0; i < asyncConfig.getNumClients(); i++) {
             executorCompletionService.submit(new FetchEmitWorker(asyncConfig, fetchEmitTuples,
                     emitData));
         }
 
         EmitterManager emitterManager = EmitterManager.load(tikaConfigPath);
-        for (int i = 0; i < numEmitterThreads; i++) {
+        for (int i = 0; i < asyncConfig.getNumEmitters(); i++) {
             executorCompletionService.submit(new AsyncEmitter(emitData, emitterManager));
         }
     }
@@ -85,9 +85,9 @@
             throw new IllegalStateException(
                     "Can't call offer after calling close() or " + "shutdownNow()");
         }
-        if (newFetchEmitTuples.size() > fetchEmitTupleQSize) {
+        if (newFetchEmitTuples.size() > asyncConfig.getQueueSize()) {
             throw new OfferLargerThanQueueSize(newFetchEmitTuples.size(),
-                    fetchEmitTupleQSize);
+                    asyncConfig.getQueueSize());
         }
         long start = System.currentTimeMillis();
         long elapsed = System.currentTimeMillis() - start;
@@ -124,6 +124,13 @@
         return fetchEmitTuples.offer(t, offerMs, TimeUnit.MILLISECONDS);
     }
 
+    public void finished() throws InterruptedException {
+        for (int i = 0; i < asyncConfig.getNumClients(); i++) {
+            //can hang forever
+            fetchEmitTuples.offer(PipesIterator.COMPLETED_SEMAPHORE);
+        }
+    }
+
     public boolean checkActive() {
 
         Future<Integer> future = executorCompletionService.poll();
@@ -139,13 +146,14 @@
             }
             finished++;
         }
-        if (numParserThreadsFinished == numParserThreads && ! addedEmitterSemaphores) {
-            for (int i = 0; i < numEmitterThreads; i++) {
+        if (numParserThreadsFinished == asyncConfig.getNumClients() && ! addedEmitterSemaphores) {
+            for (int i = 0; i < asyncConfig.getNumEmitters(); i++) {
+                //can hang forever
                 emitData.offer(AsyncEmitter.EMIT_DATA_STOP_SEMAPHORE);
             }
             addedEmitterSemaphores = true;
         }
-        return finished != (numEmitterThreads + numParserThreads);
+        return finished != (asyncConfig.getNumClients() + asyncConfig.getNumEmitters());
     }
 
     @Override
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/FileSystemFetcher.java b/tika-core/src/main/java/org/apache/tika/pipes/fetcher/fs/FileSystemFetcher.java
similarity index 97%
rename from tika-core/src/main/java/org/apache/tika/pipes/fetcher/FileSystemFetcher.java
rename to tika-core/src/main/java/org/apache/tika/pipes/fetcher/fs/FileSystemFetcher.java
index e4f50d2..5c22e7f 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/FileSystemFetcher.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/fetcher/fs/FileSystemFetcher.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.tika.pipes.fetcher;
+package org.apache.tika.pipes.fetcher.fs;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -33,6 +33,7 @@
 import org.apache.tika.io.TikaInputStream;
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.metadata.TikaCoreProperties;
+import org.apache.tika.pipes.fetcher.AbstractFetcher;
 
 public class FileSystemFetcher extends AbstractFetcher implements Initializable {
 
diff --git a/tika-core/src/test/java/org/apache/tika/config/TikaPipesConfigTest.java b/tika-core/src/test/java/org/apache/tika/config/TikaPipesConfigTest.java
index 7777278..b27a82f 100644
--- a/tika-core/src/test/java/org/apache/tika/config/TikaPipesConfigTest.java
+++ b/tika-core/src/test/java/org/apache/tika/config/TikaPipesConfigTest.java
@@ -28,7 +28,7 @@
 import org.apache.tika.pipes.emitter.EmitterManager;
 import org.apache.tika.pipes.fetcher.Fetcher;
 import org.apache.tika.pipes.fetcher.FetcherManager;
-import org.apache.tika.pipes.fetcher.FileSystemFetcher;
+import org.apache.tika.pipes.fetcher.fs.FileSystemFetcher;
 import org.apache.tika.pipes.pipesiterator.PipesIterator;
 
 public class TikaPipesConfigTest extends AbstractTikaConfigTest {
diff --git a/tika-core/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java b/tika-core/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
index 3040d3a..4dd8a79 100644
--- a/tika-core/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
+++ b/tika-core/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
@@ -62,7 +62,7 @@
                 "  <emitter class=\"org.apache.tika.pipes.async.MockEmitter\">\n" +
                 "    <params>\n" + "      <name>mock</name>\n" + "    </params>" + "  </emitter>" +
                 "  </emitters>" + "  <fetchers>" +
-                "    <fetcher class=\"org.apache.tika.pipes.fetcher.FileSystemFetcher\">" +
+                "    <fetcher class=\"org.apache.tika.pipes.fetcher.fs.FileSystemFetcher\">" +
                 "      <params><name>mock</name>\n" + "      <basePath>" +
                 ProcessUtils.escapeCommandLine(inputDir.toAbsolutePath().toString()) +
                 "</basePath></params>\n" + "    </fetcher>" + "  </fetchers>" +
diff --git a/tika-core/src/test/java/org/apache/tika/pipes/fetcher/FileSystemFetcherTest.java b/tika-core/src/test/java/org/apache/tika/pipes/fetcher/fs/FileSystemFetcherTest.java
similarity index 97%
rename from tika-core/src/test/java/org/apache/tika/pipes/fetcher/FileSystemFetcherTest.java
rename to tika-core/src/test/java/org/apache/tika/pipes/fetcher/fs/FileSystemFetcherTest.java
index 4e03117..36dfdaf 100644
--- a/tika-core/src/test/java/org/apache/tika/pipes/fetcher/FileSystemFetcherTest.java
+++ b/tika-core/src/test/java/org/apache/tika/pipes/fetcher/fs/FileSystemFetcherTest.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.tika.pipes.fetcher;
+package org.apache.tika.pipes.fetcher.fs;
 
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
diff --git a/tika-core/src/test/resources/org/apache/tika/config/fetchers-config.xml b/tika-core/src/test/resources/org/apache/tika/config/fetchers-config.xml
index cd53dd9..6858eef 100644
--- a/tika-core/src/test/resources/org/apache/tika/config/fetchers-config.xml
+++ b/tika-core/src/test/resources/org/apache/tika/config/fetchers-config.xml
@@ -17,13 +17,13 @@
 -->
 <properties>
   <fetchers>
-    <fetcher class="org.apache.tika.pipes.fetcher.FileSystemFetcher">
+    <fetcher class="org.apache.tika.pipes.fetcher.fs.FileSystemFetcher">
       <params>
         <name>fs1</name>
         <basePath>/my/base/path1</basePath>
       </params>
     </fetcher>
-    <fetcher class="org.apache.tika.pipes.fetcher.FileSystemFetcher">
+    <fetcher class="org.apache.tika.pipes.fetcher.fs.FileSystemFetcher">
       <params>
         <name>fs2</name>
         <basePath>/my/base/path2</basePath>
diff --git a/tika-core/src/test/resources/org/apache/tika/config/fetchers-duplicate-config.xml b/tika-core/src/test/resources/org/apache/tika/config/fetchers-duplicate-config.xml
index 0d4a2c0..64d785f 100644
--- a/tika-core/src/test/resources/org/apache/tika/config/fetchers-duplicate-config.xml
+++ b/tika-core/src/test/resources/org/apache/tika/config/fetchers-duplicate-config.xml
@@ -17,13 +17,13 @@
 -->
 <properties>
   <fetchers>
-    <fetcher class="org.apache.tika.pipes.fetcher.FileSystemFetcher">
+    <fetcher class="org.apache.tika.pipes.fetcher.fs.FileSystemFetcher">
       <params>
           <name>fs1</name>
           <basePath>/my/base/path1</basePath>
       </params>
     </fetcher>
-    <fetcher class="org.apache.tika.pipes.fetcher.FileSystemFetcher">
+    <fetcher class="org.apache.tika.pipes.fetcher.fs.FileSystemFetcher">
       <params>
         <name>fs1</name>
           <basePath>/my/base/path2</basePath>
diff --git a/tika-core/src/test/resources/org/apache/tika/config/fetchers-nobasepath-config.xml b/tika-core/src/test/resources/org/apache/tika/config/fetchers-nobasepath-config.xml
index 2396ccc..0cd79e4 100644
--- a/tika-core/src/test/resources/org/apache/tika/config/fetchers-nobasepath-config.xml
+++ b/tika-core/src/test/resources/org/apache/tika/config/fetchers-nobasepath-config.xml
@@ -17,13 +17,13 @@
 -->
 <properties>
   <fetchers>
-    <fetcher class="org.apache.tika.pipes.fetcher.FileSystemFetcher">
+    <fetcher class="org.apache.tika.pipes.fetcher.fs.FileSystemFetcher">
       <params>
         <name>fs1</name>
         <basePath>/my/base/path1</basePath>
       </params>
     </fetcher>
-    <fetcher class="org.apache.tika.pipes.fetcher.FileSystemFetcher">
+    <fetcher class="org.apache.tika.pipes.fetcher.fs.FileSystemFetcher">
       <params>
         <name>fs2</name>
       </params>
diff --git a/tika-core/src/test/resources/org/apache/tika/config/fetchers-noname-config.xml b/tika-core/src/test/resources/org/apache/tika/config/fetchers-noname-config.xml
index cd17e42..d07aacc 100644
--- a/tika-core/src/test/resources/org/apache/tika/config/fetchers-noname-config.xml
+++ b/tika-core/src/test/resources/org/apache/tika/config/fetchers-noname-config.xml
@@ -17,13 +17,13 @@
 -->
 <properties>
   <fetchers>
-    <fetcher class="org.apache.tika.pipes.fetcher.FileSystemFetcher">
+    <fetcher class="org.apache.tika.pipes.fetcher.fs.FileSystemFetcher">
       <params>
         <name>fs1</name>
         <basePath>/my/base/path1</basePath>
       </params>
     </fetcher>
-    <fetcher class="org.apache.tika.pipes.fetcher.FileSystemFetcher">
+    <fetcher class="org.apache.tika.pipes.fetcher.fs.FileSystemFetcher">
       <params>
         <basePath>/my/base/path2</basePath>
       </params>
diff --git a/tika-server/tika-server-client/src/test/resources/tika-config-simple-fs-emitter.xml b/tika-server/tika-server-client/src/test/resources/tika-config-simple-fs-emitter.xml
index 62e692f..012d0c6 100644
--- a/tika-server/tika-server-client/src/test/resources/tika-config-simple-fs-emitter.xml
+++ b/tika-server/tika-server-client/src/test/resources/tika-config-simple-fs-emitter.xml
@@ -28,7 +28,7 @@
     </pipesIterator>
   </pipesIterators>
   <fetchers>
-    <fetcher class="org.apache.tika.pipes.fetcher.FileSystemFetcher">
+    <fetcher class="org.apache.tika.pipes.fetcher.fs.FileSystemFetcher">
       <params>
         <name>fs</name>
         <basePath>fix</basePath>
diff --git a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaPipesTest.java b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaPipesTest.java
index e82ff55..f6d37de 100644
--- a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaPipesTest.java
+++ b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaPipesTest.java
@@ -102,7 +102,7 @@
 
         TIKA_CONFIG_XML =
                 "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" + "<properties>" + "<fetchers>" +
-                        "<fetcher class=\"org.apache.tika.pipes.fetcher.FileSystemFetcher\">" +
+                        "<fetcher class=\"org.apache.tika.pipes.fetcher.fs.FileSystemFetcher\">" +
                         "<params>" + "<name>fsf</name>" +
                         "<basePath>" + inputDir.toAbsolutePath() +
                         "</basePath>" + "</params>" + "</fetcher>" + "</fetchers>" + "<emitters>" +
diff --git a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerAsyncIntegrationTest.java b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerAsyncIntegrationTest.java
index 2724c70..841d8b3 100644
--- a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerAsyncIntegrationTest.java
+++ b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerAsyncIntegrationTest.java
@@ -100,7 +100,7 @@
 
         TIKA_CONFIG_XML =
                 "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" + "<properties>" + "<fetchers>" +
-                        "<fetcher class=\"org.apache.tika.pipes.fetcher.FileSystemFetcher\">" +
+                        "<fetcher class=\"org.apache.tika.pipes.fetcher.fs.FileSystemFetcher\">" +
                         "<params>" + "<name>" + FETCHER_NAME +
                         "</name>" + "<basePath>" +
                         inputDir.toAbsolutePath() + "</basePath>" + "</params>" + "</fetcher>" +
diff --git a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerPipesIntegrationTest.java b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerPipesIntegrationTest.java
index 5ff0f11..8aba852 100644
--- a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerPipesIntegrationTest.java
+++ b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerPipesIntegrationTest.java
@@ -83,7 +83,7 @@
         TIKA_CONFIG_TIMEOUT = TMP_DIR.resolve("tika-config-timeout.xml");
         //TODO -- clean this up so that port is sufficient and we don't need portString
         String xml1 = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" + "<properties>" + "<fetchers>" +
-                "<fetcher class=\"org.apache.tika.pipes.fetcher.FileSystemFetcher\">" + "<params>" +
+                "<fetcher class=\"org.apache.tika.pipes.fetcher.fs.FileSystemFetcher\">" + "<params>" +
                 "<name>" + FETCHER_NAME + "</name>" +
                 "<basePath>" + inputDir.toAbsolutePath() +
                 "</basePath>" + "</params>" + "</fetcher>" + "</fetchers>" + "<emitters>" +