blob: a8a4988088a133c68fa73c2b1adec821c57aea45 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tika.pipes.pipesiterator.fs;
import java.io.Closeable;
import java.io.IOException;
import java.nio.file.FileVisitResult;
import java.nio.file.FileVisitor;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.tika.config.Field;
import org.apache.tika.config.Initializable;
import org.apache.tika.config.InitializableProblemHandler;
import org.apache.tika.config.Param;
import org.apache.tika.config.TikaConfig;
import org.apache.tika.exception.TikaConfigException;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.parser.ParseContext;
import org.apache.tika.pipes.core.FetchEmitTuple;
import org.apache.tika.pipes.core.HandlerConfig;
import org.apache.tika.pipes.core.async.AsyncProcessor;
import org.apache.tika.pipes.core.emitter.EmitKey;
import org.apache.tika.pipes.core.fetcher.FetchKey;
import org.apache.tika.pipes.core.pipesiterator.PipesIterator;
import org.apache.tika.pipes.core.pipesiterator.TotalCountResult;
import org.apache.tika.pipes.core.pipesiterator.TotalCounter;
public class FileSystemPipesIterator extends PipesIterator
implements TotalCounter, Initializable, Closeable {
private static final Logger LOG = LoggerFactory.getLogger(AsyncProcessor.class);
private Path basePath;
private boolean countTotal = false;
private FileCountWorker fileCountWorker;
public FileSystemPipesIterator() {
}
public FileSystemPipesIterator(Path basePath) {
this.basePath = basePath;
}
@Field
public void setBasePath(String basePath) {
this.basePath = Paths.get(basePath);
}
@Override
protected void enqueue() throws InterruptedException, IOException, TimeoutException {
if (!Files.isDirectory(basePath)) {
throw new IllegalArgumentException(
"\"basePath\" directory does not exist: " + basePath.toAbsolutePath());
}
try {
Files.walkFileTree(basePath, new FSFileVisitor(getFetcherName(), getEmitterName()));
} catch (IOException e) {
Throwable cause = e.getCause();
if (cause != null && cause instanceof TimeoutException) {
throw (TimeoutException) cause;
}
throw e;
}
}
@Override
public void checkInitialization(InitializableProblemHandler problemHandler)
throws TikaConfigException {
//these should all be fatal
TikaConfig.mustNotBeEmpty("basePath", basePath);
TikaConfig.mustNotBeEmpty("fetcherName", getFetcherName());
TikaConfig.mustNotBeEmpty("emitterName", getFetcherName());
}
@Override
public void initialize(Map<String, Param> params) throws TikaConfigException {
if (countTotal) {
fileCountWorker = new FileCountWorker(basePath);
}
}
@Field
public void setCountTotal(boolean countTotal) {
this.countTotal = countTotal;
}
@Override
public void startTotalCount() {
if (! countTotal) {
return;
}
fileCountWorker.startTotalCount();
}
@Override
public TotalCountResult getTotalCount() {
if (! countTotal) {
return TotalCountResult.UNSUPPORTED;
}
return fileCountWorker.getTotalCount();
}
@Override
public void close() throws IOException {
if (fileCountWorker != null) {
fileCountWorker.close();
}
}
private class FSFileVisitor implements FileVisitor<Path> {
private final String fetcherName;
private final String emitterName;
private FSFileVisitor(String fetcherName, String emitterName) {
this.fetcherName = fetcherName;
this.emitterName = emitterName;
}
@Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs)
throws IOException {
return FileVisitResult.CONTINUE;
}
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
String relPath = basePath.relativize(file).toString();
try {
ParseContext parseContext = new ParseContext();
parseContext.set(HandlerConfig.class, getHandlerConfig());
tryToAdd(new FetchEmitTuple(relPath, new FetchKey(fetcherName, relPath),
new EmitKey(emitterName, relPath), new Metadata(), parseContext,
getOnParseException()));
} catch (TimeoutException e) {
throw new IOException(e);
} catch (InterruptedException e) {
return FileVisitResult.TERMINATE;
}
return FileVisitResult.CONTINUE;
}
@Override
public FileVisitResult visitFileFailed(Path file, IOException exc) throws IOException {
return FileVisitResult.CONTINUE;
}
@Override
public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
return FileVisitResult.CONTINUE;
}
}
private static class FileCountWorker implements TotalCounter, Closeable {
private Thread totalCounterThread;
private final AtomicLong totalCount = new AtomicLong(0);
private TotalCountResult.STATUS status;
private TotalCountResult finalResult;
private final Path basePath;
public FileCountWorker(Path basePath) {
this.basePath = basePath;
this.status = TotalCountResult.STATUS.NOT_COMPLETED;
}
@Override
public void startTotalCount() {
totalCounterThread = new Thread(() -> {
try {
Files.walkFileTree(basePath, new FSFileCounter(totalCount));
status = TotalCountResult.STATUS.COMPLETED;
finalResult = new TotalCountResult(totalCount.get(), status);
} catch (IOException e) {
LOG.warn("problem counting files", e);
status = TotalCountResult.STATUS.EXCEPTION;
finalResult = new TotalCountResult(totalCount.get(), status);
}
});
totalCounterThread.setDaemon(true);
totalCounterThread.start();
}
@Override
public TotalCountResult getTotalCount() {
if (finalResult != null) {
return finalResult;
}
return new TotalCountResult(totalCount.get(), status);
}
@Override
public void close() throws IOException {
totalCounterThread.interrupt();
}
private static class FSFileCounter implements FileVisitor<Path> {
private final AtomicLong count;
private FSFileCounter(AtomicLong count) {
this.count = count;
}
@Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs)
throws IOException {
return FileVisitResult.CONTINUE;
}
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
count.incrementAndGet();
return FileVisitResult.CONTINUE;
}
@Override
public FileVisitResult visitFileFailed(Path file, IOException exc) throws IOException {
return FileVisitResult.CONTINUE;
}
@Override
public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
return FileVisitResult.CONTINUE;
}
}
}
}