blob: 5b602a94c90dbe8d642459e0c3ea81fcbe051fde [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tika.pipes.async;
import java.io.Closeable;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.io.file.Counters;
import org.apache.commons.io.file.PathUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.tika.exception.TikaException;
import org.apache.tika.pipes.FetchEmitTuple;
import org.apache.tika.pipes.PipesClient;
import org.apache.tika.pipes.PipesException;
import org.apache.tika.pipes.PipesReporter;
import org.apache.tika.pipes.PipesResult;
import org.apache.tika.pipes.emitter.EmitData;
import org.apache.tika.pipes.emitter.EmitterManager;
import org.apache.tika.pipes.pipesiterator.PipesIterator;
import org.apache.tika.pipes.pipesiterator.TotalCountResult;
import org.apache.tika.pipes.pipesiterator.TotalCounter;
/**
* This is the main class for handling async requests. This manages
* AsyncClients and AsyncEmitters.
*
*/
public class AsyncProcessor implements Closeable {
static final int PARSER_FUTURE_CODE = 1;
static final int WATCHER_FUTURE_CODE = 3;
private static final Logger LOG = LoggerFactory.getLogger(AsyncProcessor.class);
private final ArrayBlockingQueue<FetchEmitTuple> fetchEmitTuples;
private final ArrayBlockingQueue<EmitData> emitData;
private final ExecutorCompletionService<Integer> executorCompletionService;
private final ExecutorService executorService;
private final AsyncConfig asyncConfig;
private final AtomicLong totalProcessed = new AtomicLong(0);
private static long MAX_OFFER_WAIT_MS = 120000;
private volatile int numParserThreadsFinished = 0;
private volatile int numEmitterThreadsFinished = 0;
private boolean addedEmitterSemaphores = false;
boolean isShuttingDown = false;
public AsyncProcessor(Path tikaConfigPath) throws TikaException, IOException {
this(tikaConfigPath, null);
}
public AsyncProcessor(Path tikaConfigPath, PipesIterator pipesIterator) throws TikaException, IOException {
this.asyncConfig = AsyncConfig.load(tikaConfigPath);
this.fetchEmitTuples = new ArrayBlockingQueue<>(asyncConfig.getQueueSize());
this.emitData = new ArrayBlockingQueue<>(100);
//+1 is the watcher thread
this.executorService = Executors.newFixedThreadPool(
asyncConfig.getNumClients() + asyncConfig.getNumEmitters() + 1);
this.executorCompletionService =
new ExecutorCompletionService<>(executorService);
try {
if (!tikaConfigPath.toAbsolutePath().equals(asyncConfig.getTikaConfig().toAbsolutePath())) {
LOG.warn("TikaConfig for AsyncProcessor ({}) is different " +
"from TikaConfig for workers ({}). If this is intended," +
" please ignore this warning.", tikaConfigPath.toAbsolutePath(),
asyncConfig.getTikaConfig().toAbsolutePath());
}
this.executorCompletionService.submit(() -> {
while (true) {
try {
Thread.sleep(500);
checkActive();
} catch (InterruptedException e) {
return WATCHER_FUTURE_CODE;
}
}
});
//this is run in a daemon thread
if (pipesIterator != null && (pipesIterator instanceof TotalCounter)) {
LOG.debug("going to total counts");
startCounter((TotalCounter) pipesIterator);
}
for (int i = 0; i < asyncConfig.getNumClients(); i++) {
executorCompletionService.submit(
new FetchEmitWorker(asyncConfig, fetchEmitTuples, emitData));
}
EmitterManager emitterManager = EmitterManager.load(asyncConfig.getTikaConfig());
for (int i = 0; i < asyncConfig.getNumEmitters(); i++) {
executorCompletionService.submit(
new AsyncEmitter(asyncConfig, emitData, emitterManager));
}
} catch (Exception e) {
LOG.error("problem initializing AsyncProcessor", e);
executorService.shutdownNow();
asyncConfig.getPipesReporter().error(e);
throw e;
}
}
private void startCounter(TotalCounter totalCounter) {
Thread counterThread = new Thread(() -> {
totalCounter.startTotalCount();
PipesReporter pipesReporter = asyncConfig.getPipesReporter();
TotalCountResult.STATUS status = totalCounter.getTotalCount().getStatus();
while (status == TotalCountResult.STATUS.NOT_COMPLETED) {
try {
Thread.sleep(500);
TotalCountResult result = totalCounter.getTotalCount();
LOG.trace("counter total {} {} ", result.getStatus(), result.getTotalCount());
pipesReporter.report(result);
status = result.getStatus();
} catch (InterruptedException e) {
return;
}
}
});
counterThread.setDaemon(true);
counterThread.start();
}
public synchronized boolean offer(List<FetchEmitTuple> newFetchEmitTuples, long offerMs)
throws PipesException, InterruptedException {
if (isShuttingDown) {
throw new IllegalStateException(
"Can't call offer after calling close() or " + "shutdownNow()");
}
if (newFetchEmitTuples.size() > asyncConfig.getQueueSize()) {
throw new OfferLargerThanQueueSize(newFetchEmitTuples.size(),
asyncConfig.getQueueSize());
}
long start = System.currentTimeMillis();
long elapsed = System.currentTimeMillis() - start;
while (elapsed < offerMs) {
if (fetchEmitTuples.remainingCapacity() > newFetchEmitTuples.size()) {
try {
fetchEmitTuples.addAll(newFetchEmitTuples);
return true;
} catch (IllegalStateException e) {
//this means that the add all failed because the queue couldn't
//take the full list
LOG.debug("couldn't add full list", e);
}
}
Thread.sleep(100);
elapsed = System.currentTimeMillis() - start;
}
return false;
}
public int getCapacity() {
return fetchEmitTuples.remainingCapacity();
}
public synchronized boolean offer(FetchEmitTuple t, long offerMs)
throws PipesException, InterruptedException {
if (fetchEmitTuples == null) {
throw new IllegalStateException("queue hasn't been initialized yet.");
} else if (isShuttingDown) {
throw new IllegalStateException(
"Can't call offer after calling close() or " + "shutdownNow()");
}
checkActive();
return fetchEmitTuples.offer(t, offerMs, TimeUnit.MILLISECONDS);
}
public void finished() throws InterruptedException {
for (int i = 0; i < asyncConfig.getNumClients(); i++) {
boolean offered = fetchEmitTuples.offer(PipesIterator.COMPLETED_SEMAPHORE,
MAX_OFFER_WAIT_MS, TimeUnit.MILLISECONDS);
if (! offered) {
throw new RuntimeException("Couldn't offer completed semaphore within " +
MAX_OFFER_WAIT_MS + " ms");
}
}
}
public synchronized boolean checkActive() throws InterruptedException {
Future<Integer> future = executorCompletionService.poll();
if (future != null) {
try {
Integer i = future.get();
switch (i) {
case PARSER_FUTURE_CODE :
numParserThreadsFinished++;
LOG.debug("fetchEmitWorker finished, total {}", numParserThreadsFinished);
break;
case AsyncEmitter.EMITTER_FUTURE_CODE :
numEmitterThreadsFinished++;
LOG.debug("emitter thread finished, total {}", numEmitterThreadsFinished);
break;
case WATCHER_FUTURE_CODE :
LOG.debug("watcher thread finished");
break;
default :
throw new IllegalArgumentException("Don't recognize this future code: " + i);
}
} catch (ExecutionException e) {
LOG.error("execution exception", e);
asyncConfig.getPipesReporter().error(e);
throw new RuntimeException(e);
}
}
if (numParserThreadsFinished == asyncConfig.getNumClients() && ! addedEmitterSemaphores) {
for (int i = 0; i < asyncConfig.getNumEmitters(); i++) {
try {
boolean offered = emitData.offer(AsyncEmitter.EMIT_DATA_STOP_SEMAPHORE,
MAX_OFFER_WAIT_MS,
TimeUnit.MILLISECONDS);
if (! offered) {
throw new RuntimeException("Couldn't offer emit data stop semaphore " +
"within " + MAX_OFFER_WAIT_MS + " ms");
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
addedEmitterSemaphores = true;
}
return !(numParserThreadsFinished == asyncConfig.getNumClients() &&
numEmitterThreadsFinished == asyncConfig.getNumEmitters());
}
@Override
public void close() throws IOException {
executorService.shutdownNow();
asyncConfig.getPipesReporter().close();
if (Files.isDirectory(asyncConfig.getPipesTmpDir())) {
try {
LOG.debug("about to delete the full async temp directory: {}",
asyncConfig.getPipesTmpDir().toAbsolutePath());
Counters.PathCounters pathCounters = PathUtils.deleteDirectory(asyncConfig.getPipesTmpDir());
LOG.debug("Successfully deleted {} temporary files in {} directories",
pathCounters.getFileCounter().get(),
pathCounters.getDirectoryCounter().get());
} catch (IllegalArgumentException e) {
throw new IOException(e);
}
}
}
public long getTotalProcessed() {
return totalProcessed.get();
}
private class FetchEmitWorker implements Callable<Integer> {
private final AsyncConfig asyncConfig;
private final ArrayBlockingQueue<FetchEmitTuple> fetchEmitTuples;
private final ArrayBlockingQueue<EmitData> emitDataQueue;
private FetchEmitWorker(AsyncConfig asyncConfig,
ArrayBlockingQueue<FetchEmitTuple> fetchEmitTuples,
ArrayBlockingQueue<EmitData> emitDataQueue) {
this.asyncConfig = asyncConfig;
this.fetchEmitTuples = fetchEmitTuples;
this.emitDataQueue = emitDataQueue;
}
@Override
public Integer call() throws Exception {
try (PipesClient pipesClient = new PipesClient(asyncConfig)) {
while (true) {
FetchEmitTuple t = fetchEmitTuples.poll(1, TimeUnit.SECONDS);
if (t == null) {
//skip
if (LOG.isTraceEnabled()) {
LOG.trace("null fetch emit tuple");
}
} else if (t == PipesIterator.COMPLETED_SEMAPHORE) {
if (LOG.isTraceEnabled()) {
LOG.trace("hit completed semaphore");
}
return PARSER_FUTURE_CODE;
} else {
PipesResult result = null;
long start = System.currentTimeMillis();
try {
result = pipesClient.process(t);
} catch (IOException e) {
LOG.warn("pipesClient crash", e);
result = PipesResult.UNSPECIFIED_CRASH;
}
if (LOG.isTraceEnabled()) {
LOG.trace("timer -- pipes client process: {} ms",
System.currentTimeMillis() - start);
}
long offerStart = System.currentTimeMillis();
if (shouldEmit(result)) {
LOG.trace("adding result to emitter queue: " + result.getEmitData());
boolean offered = emitDataQueue.offer(result.getEmitData(),
MAX_OFFER_WAIT_MS,
TimeUnit.MILLISECONDS);
if (! offered) {
throw new RuntimeException("Couldn't offer emit data to queue " +
"within " + MAX_OFFER_WAIT_MS + " ms");
}
}
if (LOG.isTraceEnabled()) {
LOG.trace("timer -- offered: {} ms",
System.currentTimeMillis() - offerStart);
}
long elapsed = System.currentTimeMillis() - start;
asyncConfig.getPipesReporter().report(t, result, elapsed);
totalProcessed.incrementAndGet();
}
}
}
}
private boolean shouldEmit(PipesResult result) {
if (result.getStatus() == PipesResult.STATUS.PARSE_SUCCESS ||
result.getStatus() == PipesResult.STATUS.PARSE_SUCCESS_WITH_EXCEPTION) {
return true;
}
return result.isIntermediate() && asyncConfig.isEmitIntermediateResults();
}
}
}