blob: 8c5a1531ad9c78a00580dda53c04a6a18a68010e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tika.server.client;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xml.sax.SAXException;
import org.apache.tika.config.TikaConfig;
import org.apache.tika.exception.TikaException;
import org.apache.tika.pipes.FetchEmitTuple;
import org.apache.tika.pipes.pipesiterator.PipesIterator;
public class TikaClientCLI {
private static final Logger LOGGER = LoggerFactory.getLogger(TikaClientCLI.class);
private static final int QUEUE_SIZE = 10000;
private final long maxWaitMs = 300000;
public static void main(String[] args) throws Exception {
//TODO -- add an actual commandline,
Path tikaConfigPath = Paths.get(args[0]);
int numThreads = Integer.parseInt(args[1]);
List<String> tikaServerUrls = Arrays.asList(args[2].split(","));
TikaClientCLI cli = new TikaClientCLI();
cli.execute(tikaConfigPath, tikaServerUrls, numThreads);
}
private void execute(Path tikaConfigPath, List<String> tikaServerUrls, int numThreads)
throws TikaException, IOException, SAXException {
TikaConfig config = new TikaConfig(tikaConfigPath);
ExecutorService executorService = Executors.newFixedThreadPool(numThreads + 1);
ExecutorCompletionService<Integer> completionService =
new ExecutorCompletionService<>(executorService);
final PipesIterator pipesIterator =
PipesIterator.build(tikaConfigPath);
final ArrayBlockingQueue<FetchEmitTuple> queue =
new ArrayBlockingQueue<>(QUEUE_SIZE);
completionService.submit(new PipesIteratorWrapper(pipesIterator, queue, numThreads));
if (tikaServerUrls.size() == numThreads) {
logDiffSizes(tikaServerUrls.size(), numThreads);
for (int i = 0; i < numThreads; i++) {
TikaClient client =
TikaClient.get(config, Collections.singletonList(tikaServerUrls.get(i)));
completionService.submit(new FetchWorker(queue, client));
}
} else {
for (int i = 0; i < numThreads; i++) {
TikaClient client = TikaClient.get(config, tikaServerUrls);
completionService.submit(new FetchWorker(queue, client));
}
}
int finished = 0;
while (finished < numThreads + 1) {
Future<Integer> future = null;
try {
future = completionService.poll(maxWaitMs, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
//stop the world
LOGGER.error("", e);
throw new RuntimeException(e);
}
if (future != null) {
finished++;
try {
future.get();
} catch (InterruptedException | ExecutionException e) {
//stop the world
LOGGER.error("", e);
throw new RuntimeException(e);
}
}
}
}
private void logDiffSizes(int servers, int numThreads) {
LOGGER.info("tika server count ({}) != numThreads ({}). " +
"Each client will randomly select a server from this list", servers, numThreads);
}
private class AsyncFetchWorker implements Callable<Integer> {
private final ArrayBlockingQueue<FetchEmitTuple> queue;
private final TikaClient client;
public AsyncFetchWorker(ArrayBlockingQueue<FetchEmitTuple> queue, TikaClient client) {
this.queue = queue;
this.client = client;
}
@Override
public Integer call() throws Exception {
List<FetchEmitTuple> localCache = new ArrayList<>();
while (true) {
FetchEmitTuple t = queue.poll(maxWaitMs, TimeUnit.MILLISECONDS);
if (t == null) {
send(localCache);
throw new TimeoutException("exceeded maxWaitMs");
}
if (t == PipesIterator.COMPLETED_SEMAPHORE) {
send(localCache);
return 1;
}
if (localCache.size() > 20) {
LOGGER.debug("about to send: {}", localCache.size());
send(localCache);
localCache.clear();
}
localCache.add(t);
}
}
private void send(List<FetchEmitTuple> localCache) {
}
}
private class FetchWorker implements Callable<Integer> {
private final ArrayBlockingQueue<FetchEmitTuple> queue;
private final TikaClient client;
public FetchWorker(ArrayBlockingQueue<FetchEmitTuple> queue, TikaClient client) {
this.queue = queue;
this.client = client;
}
@Override
public Integer call() throws Exception {
while (true) {
FetchEmitTuple t = queue.poll(maxWaitMs, TimeUnit.MILLISECONDS);
if (t == null) {
throw new TimeoutException("exceeded maxWaitMs");
}
if (t == PipesIterator.COMPLETED_SEMAPHORE) {
return 1;
}
try {
LOGGER.debug("about to parse: {}", t.getFetchKey());
client.parse(t);
} catch (IOException | TikaException e) {
LOGGER.warn(t.getFetchKey().toString(), e);
}
}
}
}
private static class PipesIteratorWrapper implements Callable<Integer> {
private final PipesIterator pipesIterator;
private final ArrayBlockingQueue<FetchEmitTuple> queue;
private final int numThreads;
public PipesIteratorWrapper(PipesIterator pipesIterator,
ArrayBlockingQueue<FetchEmitTuple> queue,
int numThreads) {
this.pipesIterator = pipesIterator;
this.queue = queue;
this.numThreads = numThreads;
}
@Override
public Integer call() throws Exception {
for (FetchEmitTuple t : pipesIterator) {
//potentially blocks forever
queue.put(t);
}
for (int i = 0; i < numThreads; i ++) {
queue.put(PipesIterator.COMPLETED_SEMAPHORE);
}
return 1;
}
}
}