blob: 841d8b3790df3913a8fac6749c8eaae2fb5e015d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tika.server.core;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import javax.ws.rs.core.Response;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.io.FileUtils;
import org.apache.cxf.jaxrs.client.WebClient;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.metadata.serialization.JsonFetchEmitTupleList;
import org.apache.tika.pipes.FetchEmitTuple;
import org.apache.tika.pipes.HandlerConfig;
import org.apache.tika.pipes.emitter.EmitKey;
import org.apache.tika.pipes.fetcher.FetchKey;
import org.apache.tika.utils.ProcessUtils;
@Ignore("useful for development...need to turn it into a real unit test")
public class TikaServerAsyncIntegrationTest extends IntegrationTestBase {
private static final Logger LOG = LoggerFactory.getLogger(TikaServerAsyncIntegrationTest.class);
private static final int NUM_FILES = 100;
private static final String EMITTER_NAME = "fse";
private static final String FETCHER_NAME = "fsf";
private static FetchEmitTuple.ON_PARSE_EXCEPTION ON_PARSE_EXCEPTION =
FetchEmitTuple.ON_PARSE_EXCEPTION.EMIT;
private static Path TMP_DIR;
private static Path TMP_OUTPUT_DIR;
private static String TIKA_CONFIG_XML;
private static Path TIKA_CONFIG;
private static List<String> FILE_LIST = new ArrayList<>();
private static String[] FILES = new String[]{
"hello_world.xml",
"null_pointer.xml",
// "heavy_hang_30000.xml", "real_oom.xml",
"system_exit.xml"
};
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TMP_DIR = Files.createTempDirectory("tika-emitter-test-");
Path inputDir = TMP_DIR.resolve("input");
TMP_OUTPUT_DIR = TMP_DIR.resolve("output");
Files.createDirectories(inputDir);
Files.createDirectories(TMP_OUTPUT_DIR);
Random rand = new Random();
for (int i = 0; i < NUM_FILES; i++) {
for (String mockFile : FILES) {
if (mockFile.equals("system_exit.xml")) {
if (rand.nextFloat() > 0.1) {
continue;
}
}
String targetName = i + "-" + mockFile;
Path target = inputDir.resolve(targetName);
FILE_LIST.add(targetName);
Files.copy(TikaPipesTest.class
.getResourceAsStream("/test-documents/mock/" + mockFile), target);
}
}
TIKA_CONFIG = TMP_DIR.resolve("tika-config.xml");
TIKA_CONFIG_XML =
"<?xml version=\"1.0\" encoding=\"UTF-8\"?>" + "<properties>" + "<fetchers>" +
"<fetcher class=\"org.apache.tika.pipes.fetcher.fs.FileSystemFetcher\">" +
"<params>" + "<name>" + FETCHER_NAME +
"</name>" + "<basePath>" +
inputDir.toAbsolutePath() + "</basePath>" + "</params>" + "</fetcher>" +
"</fetchers>" + "<emitters>" +
"<emitter class=\"org.apache.tika.pipes.emitter.fs.FileSystemEmitter\">" +
"<params>" + "<name>" + EMITTER_NAME +
"</name>" +
"<basePath>" +
TMP_OUTPUT_DIR.toAbsolutePath() + "</basePath>" + "</params>" +
"</emitter>" +
"</emitters>" +
"<server><params><endpoints><endpoint>async</endpoint></endpoints>" +
"<enableUnsecureFeatures>true</enableUnsecureFeatures></params></server>" +
"<async><params><tikaConfig>" +
ProcessUtils.escapeCommandLine(TIKA_CONFIG.toAbsolutePath().toString()) +
"</tikaConfig><numClients>10</numClients><forkedJvmArgs><arg>-Xmx256m" +
"</arg></forkedJvmArgs><timeoutMillis>5000</timeoutMillis>" +
"</params></async>" +
"</properties>";
FileUtils.write(TIKA_CONFIG.toFile(), TIKA_CONFIG_XML, UTF_8);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
FileUtils.deleteDirectory(TMP_DIR.toFile());
}
@Before
public void setUpEachTest() throws Exception {
for (String problemFile : FILES) {
Path targ = TMP_OUTPUT_DIR.resolve(problemFile + ".json");
if (Files.exists(targ)) {
Files.delete(targ);
assertFalse(Files.isRegularFile(targ));
}
}
}
@Test
public void testBasic() throws Exception {
Thread serverThread = new Thread(() -> TikaServerCli.main(new String[]{
//for debugging/development, use no fork; otherwise go with the default
//"-noFork",
"-p", INTEGRATION_TEST_PORT, "-config",
TIKA_CONFIG.toAbsolutePath().toString()}));
serverThread.start();
try {
long start = System.currentTimeMillis();
JsonNode response = sendAsync(FILE_LIST);
String status = response.get("status").asText();
if (! "ok".equals(status)) {
fail("bad status: '" + status + "' -> " + response.toPrettyString());
}
int expected = (ON_PARSE_EXCEPTION == FetchEmitTuple.ON_PARSE_EXCEPTION.EMIT) ?
FILE_LIST.size() : FILE_LIST.size() / 3;
int targets = 0;
while (targets < NUM_FILES * 2) {
targets = countTargets();
Thread.sleep(100);
}
// System.out.println("elapsed : " + (System.currentTimeMillis() - start));
} finally {
serverThread.interrupt();
}
}
private int countTargets() {
return TMP_OUTPUT_DIR.toFile().listFiles().length;
}
private JsonNode sendAsync(List<String> fileNames) throws Exception {
awaitServerStartup();
List<FetchEmitTuple> tuples = new ArrayList<>();
for (String f : fileNames) {
tuples.add(getFetchEmitTuple(f));
}
String json = JsonFetchEmitTupleList.toJson(tuples);
Response response =
WebClient.create(endPoint + "/async").accept("application/json").post(json);
Reader reader = new InputStreamReader((InputStream) response.getEntity(), UTF_8);
return new ObjectMapper().readTree(reader);
}
private FetchEmitTuple getFetchEmitTuple(String fileName) throws IOException {
return new FetchEmitTuple(fileName, new FetchKey(FETCHER_NAME, fileName),
new EmitKey(EMITTER_NAME, ""), new Metadata(), HandlerConfig.DEFAULT_HANDLER_CONFIG,
ON_PARSE_EXCEPTION);
}
}