blob: 45a35652957571a662ce773c59401328cda16498 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tika.pipes;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.tika.pipes.async.AsyncConfig;
import org.apache.tika.pipes.emitter.EmitData;
import org.apache.tika.utils.ProcessUtils;
public class PipesClient implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(PipesClient.class);
private Process process;
private LogGobbler logGobbler;
private Thread logGobblerThread;
private final PipesConfigBase pipesConfig;
private DataOutputStream output;
private DataInputStream input;
public PipesClient(PipesConfigBase pipesConfig) {
this.pipesConfig = pipesConfig;
}
private int filesProcessed = 0;
public int getFilesProcessed() {
return filesProcessed;
}
private boolean ping() {
if (process == null || ! process.isAlive()) {
return false;
}
try {
output.write(PipesServer.PING);
output.flush();
int ping = input.read();
if (ping == PipesServer.PING) {
return true;
}
} catch (IOException e) {
return false;
}
return false;
}
@Override
public void close() throws IOException {
if (process != null) {
process.destroyForcibly();
}
}
public PipesResult process(FetchEmitTuple t) throws IOException {
if (! ping()) {
restart();
}
if (pipesConfig.getMaxFilesProcessed() > 0 &&
filesProcessed >= pipesConfig.getMaxFilesProcessed()) {
LOG.info("restarting server after hitting max files: " + filesProcessed);
restart();
}
//TODO consider adding a timer here too
// this could block forever if the watchdog thread in the server fails
// or is starved
ByteArrayOutputStream bos = new ByteArrayOutputStream();
try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(bos)) {
objectOutputStream.writeObject(t);
}
byte[] bytes = bos.toByteArray();
output.write(PipesServer.CALL);
output.writeInt(bytes.length);
output.write(bytes);
output.flush();
long start = System.currentTimeMillis();
try {
return readResults(t);
} catch (IOException e) {
try {
process.waitFor(200, TimeUnit.MILLISECONDS);
} catch (InterruptedException interruptedException) {
//wait just a little bit to let process end to get exit value
} finally {
process.destroyForcibly();
}
if (! process.isAlive() && PipesServer.TIMEOUT_EXIT_CODE == process.exitValue()) {
LOG.warn("{} timed out", t.getId());
return PipesResult.TIMEOUT;
}
return PipesResult.UNSPECIFIED_CRASH;
}
}
private PipesResult readResults(FetchEmitTuple t) throws IOException {
int status = input.read();
switch (status) {
case PipesServer.OOM:
LOG.warn("oom: " + t.getId());
return PipesResult.OOM;
case PipesServer.TIMEOUT:
LOG.warn("timeout: " + t.getId());
return PipesResult.TIMEOUT;
case PipesServer.EMIT_EXCEPTION:
LOG.warn("emit exception: " + t.getId());
return readMessage(PipesResult.STATUS.EMIT_EXCEPTION);
case PipesServer.NO_EMITTER_FOUND:
LOG.warn("no emitter found: " + t.getId());
return PipesResult.NO_EMITTER_FOUND;
case PipesServer.PARSE_SUCCESS:
case PipesServer.PARSE_EXCEPTION_EMIT:
LOG.info("parse success: " + t.getId());
return deserializeEmitData();
case PipesServer.PARSE_EXCEPTION_NO_EMIT:
return readMessage(PipesResult.STATUS.PARSE_EXCEPTION_NO_EMIT);
case PipesServer.EMIT_SUCCESS:
LOG.info("emit success: " + t.getId());
return PipesResult.EMIT_SUCCESS;
case PipesServer.EMIT_SUCCESS_PARSE_EXCEPTION:
return readMessage(PipesResult.STATUS.EMIT_SUCCESS_PARSE_EXCEPTION);
default :
throw new IOException("problem reading response from server " + status);
}
}
private PipesResult readMessage(PipesResult.STATUS status) throws IOException {
int length = input.readInt();
byte[] bytes = new byte[length];
input.readFully(bytes);
String msg = new String(bytes, StandardCharsets.UTF_8);
return new PipesResult(status, msg);
}
private PipesResult deserializeEmitData() throws IOException {
int length = input.readInt();
byte[] bytes = new byte[length];
input.readFully(bytes);
try (ObjectInputStream objectInputStream =
new ObjectInputStream(new ByteArrayInputStream(bytes))) {
return new PipesResult((EmitData)objectInputStream.readObject());
} catch (ClassNotFoundException e) {
LOG.error("class not found exception deserializing data", e);
//this should be catastrophic
throw new RuntimeException(e);
}
}
private void restart() throws IOException {
if (process != null) {
process.destroyForcibly();
LOG.info("restarting process");
} else {
LOG.info("starting process");
}
if (logGobblerThread != null) {
logGobblerThread.interrupt();
}
ProcessBuilder pb = new ProcessBuilder(getCommandline());
process = pb.start();
logGobbler = new LogGobbler(process.getErrorStream());
logGobblerThread = new Thread(logGobbler);
logGobblerThread.setDaemon(true);
logGobblerThread.start();
input = new DataInputStream(process.getInputStream());
output = new DataOutputStream(process.getOutputStream());
}
private String[] getCommandline() {
List<String> configArgs = pipesConfig.getForkedJvmArgs();
boolean hasClassPath = false;
boolean hasHeadless = false;
boolean hasExitOnOOM = false;
for (String arg : configArgs) {
if (arg.startsWith("-Djava.awt.headless")) {
hasHeadless = true;
}
if (arg.equals("-cp") || arg.equals("--classpath")) {
hasClassPath = true;
}
if (arg.equals("-XX:+ExitOnOutOfMemoryError") ||
arg.equals("-XX:+CrashOnOutOfMemoryError")) {
hasExitOnOOM = true;
}
}
List<String> commandLine = new ArrayList<>();
String javaPath = pipesConfig.getJavaPath();
commandLine.add(ProcessUtils.escapeCommandLine(javaPath));
if (! hasClassPath) {
commandLine.add("-cp");
commandLine.add(System.getProperty("java.class.path"));
}
if (! hasHeadless) {
commandLine.add("-Djava.awt.headless=true");
}
if (! hasExitOnOOM) {
//warn
}
for (String arg : configArgs) {
commandLine.add(arg);
}
commandLine.add("org.apache.tika.pipes.PipesServer");
commandLine.add(
ProcessUtils.escapeCommandLine(pipesConfig.getTikaConfig().toAbsolutePath().toString()));
//turn off emit batching
String maxForEmitBatchBytes = "0";
if (pipesConfig instanceof AsyncConfig) {
maxForEmitBatchBytes =
Long.toString(((AsyncConfig)pipesConfig).getMaxForEmitBatchBytes());
}
commandLine.add(maxForEmitBatchBytes);
commandLine.add(Long.toString(pipesConfig.getTimeoutMillis()));
commandLine.add(Long.toString(pipesConfig.getShutdownClientAfterMillis()));
return commandLine.toArray(new String[0]);
}
public static class LogGobbler implements Runnable {
private static final Logger SERVER_LOG = LoggerFactory.getLogger(LogGobbler.class);
private final BufferedReader reader;
public LogGobbler(InputStream is) {
reader = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8));
}
@Override
public void run() {
String line = null;
try {
line = reader.readLine();
} catch (IOException e) {
return;
}
while (line != null) {
if (line.startsWith("debug ")) {
SERVER_LOG.debug(line.substring(6));
} else if (line.startsWith("info ")) {
SERVER_LOG.info(line.substring(5));
} else if (line.startsWith("warn ")) {
SERVER_LOG.warn(line.substring(5));
} else if (line.startsWith("error ")) {
SERVER_LOG.error(line.substring(6));
}
try {
line = reader.readLine();
} catch (IOException e) {
return;
}
}
}
}
}