blob: 2c366a64d25bc1a45d6330b1630a9834f5534965 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tika.server.client;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import org.apache.http.HttpHost;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.tika.exception.TikaException;
/**
* Low-level class to handle the http layer.
*/
class TikaHttpClient {
private static final String EMIT_ENDPOINT = "emit";
private static final String TIKA_ENDPOINT = "tika";
private static final String ASYNC_ENDPOINT = "async";
private static final Logger LOGGER = LoggerFactory.getLogger(TikaHttpClient.class);
private final HttpHost httpHost;
private final HttpClient httpClient;
private final String emitEndPointUrl;
private final String asyncEndPointUrl;
private final String tikaUrl;
private final int maxRetries = 3;
//if can't make contact with Tika server, max wait time in ms
private final long maxWaitForTikaMs = 120000;
//how often to ping /tika (in ms) to see if the server is up and running
private final long pulseWaitForTikaMs = 1000;
/**
* @param baseUrl url to base endpoint
* @param httpHost
* @param httpClient
*/
private TikaHttpClient(String baseUrl, HttpHost httpHost, HttpClient httpClient) {
if (!baseUrl.endsWith("/")) {
baseUrl += "/";
}
this.emitEndPointUrl = baseUrl + EMIT_ENDPOINT;
this.asyncEndPointUrl = baseUrl + ASYNC_ENDPOINT;
this.tikaUrl = baseUrl + TIKA_ENDPOINT;
this.httpHost = httpHost;
this.httpClient = httpClient;
}
static TikaHttpClient get(String baseUrl) throws TikaClientConfigException {
URI uri;
try {
uri = new URI(baseUrl);
} catch (URISyntaxException e) {
throw new TikaClientConfigException("bad URI", e);
}
HttpHost httpHost = new HttpHost(uri.getHost(), uri.getPort(), uri.getScheme());
//TODO: need to add other configuration stuff? proxy, username, password, timeouts...
HttpClient client = HttpClients.createDefault();
return new TikaHttpClient(baseUrl, httpHost, client);
}
public TikaEmitterResult postJsonAsync(String jsonRequest) {
return postJson(asyncEndPointUrl, jsonRequest);
}
public TikaEmitterResult postJson(String jsonRequest) {
return postJson(emitEndPointUrl, jsonRequest);
}
private TikaEmitterResult postJson(String endPoint, String jsonRequest) {
HttpPost post = new HttpPost(endPoint);
ByteArrayEntity entity = new ByteArrayEntity(jsonRequest.getBytes(StandardCharsets.UTF_8));
post.setEntity(entity);
post.setHeader("Content-Type", "application/json");
int tries = 0;
long start = System.currentTimeMillis();
try {
while (tries++ < maxRetries) {
HttpResponse response = null;
try {
response = httpClient.execute(httpHost, post);
} catch (IOException e) {
LOGGER.warn("Exception trying to parse", e);
waitForServer();
continue;
}
String msg = "";
try {
msg = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8);
} catch (IOException e) {
//swallow
}
long elapsed = System.currentTimeMillis() - start;
TikaEmitterResult.STATUS status = TikaEmitterResult.STATUS.OK;
if (response.getStatusLine().getStatusCode() != 200) {
status = TikaEmitterResult.STATUS.NOT_OK;
} else {
//pull out stacktrace from parse exception?
}
return new TikaEmitterResult(status, elapsed, msg);
}
} catch (TimeoutWaitingForTikaException e) {
long elapsed = System.currentTimeMillis() - start;
return new TikaEmitterResult(TikaEmitterResult.STATUS.TIMED_OUT_WAITING_FOR_TIKA,
elapsed, "");
}
long elapsed = System.currentTimeMillis() - start;
return new TikaEmitterResult(TikaEmitterResult.STATUS.EXCEEDED_MAX_RETRIES, elapsed, "");
}
private void waitForServer() throws TimeoutWaitingForTikaException {
long start = System.currentTimeMillis();
long elapsed = System.currentTimeMillis() - start;
LOGGER.info("server unreachable; waiting for it to restart");
while (elapsed < maxWaitForTikaMs) {
try {
Thread.sleep(pulseWaitForTikaMs);
} catch (InterruptedException e) {
//swallow
}
HttpGet get = new HttpGet(tikaUrl);
try {
HttpResponse response = httpClient.execute(httpHost, get);
if (response.getStatusLine().getStatusCode() == 200) {
LOGGER.debug("server back up");
return;
}
} catch (IOException e) {
elapsed = System.currentTimeMillis() - start;
LOGGER.debug("waiting for server; failed to reach it: {} ms", elapsed);
}
elapsed = System.currentTimeMillis() - start;
}
LOGGER.warn("Timeout waiting for tika server {} in {} ms", tikaUrl, elapsed);
throw new TimeoutWaitingForTikaException("");
}
private static class TimeoutWaitingForTikaException extends TikaException {
public TimeoutWaitingForTikaException(String msg) {
super(msg);
}
}
}