blob: 18b2b44f1239ff149923505705d1fdbad21f8743 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.marmotta.platform.core.services.http;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Observes;
import javax.enterprise.inject.Instance;
import javax.inject.Inject;
import net.sf.ehcache.Ehcache;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.HttpEntityEnclosingRequest;
import org.apache.http.HttpHost;
import org.apache.http.HttpRequest;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.ProtocolException;
import org.apache.http.RequestLine;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.HttpRequestRetryHandler;
import org.apache.http.client.ResponseHandler;
import org.apache.http.client.cache.CacheResponseStatus;
import org.apache.http.client.cache.HttpCacheContext;
import org.apache.http.client.cache.HttpCacheStorage;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpHead;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.conn.ClientConnectionManager;
import org.apache.http.conn.HttpClientConnectionManager;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultRedirectStrategy;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.client.cache.CacheConfig;
import org.apache.http.impl.client.cache.CachingHttpClientBuilder;
import org.apache.http.impl.client.cache.ehcache.EhcacheHttpCacheStorage;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.params.HttpParams;
import org.apache.http.pool.PoolStats;
import org.apache.http.protocol.HttpContext;
import org.apache.http.util.EntityUtils;
import org.apache.marmotta.platform.core.api.config.ConfigurationService;
import org.apache.marmotta.platform.core.api.http.HttpClientService;
import org.apache.marmotta.platform.core.api.statistics.StatisticsModule;
import org.apache.marmotta.platform.core.api.statistics.StatisticsService;
import org.apache.marmotta.platform.core.api.task.Task;
import org.apache.marmotta.platform.core.api.task.TaskManagerService;
import org.apache.marmotta.platform.core.events.ConfigurationChangedEvent;
import org.apache.marmotta.platform.core.qualifiers.cache.MarmottaCache;
import org.apache.marmotta.platform.core.services.http.response.LastModifiedResponseHandler;
import org.apache.marmotta.platform.core.services.http.response.StatusCodeResponseHandler;
import org.apache.marmotta.platform.core.services.http.response.StringBodyResponseHandler;
import org.slf4j.Logger;
/**
**/
@ApplicationScoped
public class HttpClientServiceImpl implements HttpClientService {
private static final String TASK_GROUP_CLIENT = "HttpClient";
private static final String[] KEYS = {
"requests executed", "payload sent", "payload received", "payload from cache",
"ConnectionManager", "max connections", "available connections",
"active connections", "requests waiting"
};
private static final Charset DEFAULT_CHARSET = Charset.defaultCharset();
@Inject
private Logger log;
@Inject
private TaskManagerService taskManagerService;
@Inject
private ConfigurationService configurationService;
@Inject
private StatisticsService statisticsService;
@Inject
@MarmottaCache("http-client-cache")
private Instance<Ehcache> ehcache;
private CloseableHttpClient httpClient;
private IdleConnectionMonitorThread idleConnectionMonitorThread;
// private BasicHttpParams httpParams;
private AtomicLong bytesSent = new AtomicLong();
private AtomicLong bytesReceived = new AtomicLong();
private AtomicLong bytesFromCache = new AtomicLong();
private AtomicLong requestsExecuted = new AtomicLong();
private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
/**
* Execute a request and pass the response to the provided handler.
*
* @param request
* @param handler
* @return
* @throws ClientProtocolException
* @throws IOException
*/
@Override
public <T> T execute(HttpRequestBase request, ResponseHandler<? extends T> handler) throws ClientProtocolException, IOException {
if (handler == null)
throw new IllegalArgumentException("Response handler must not be null.");
final long start = System.nanoTime();
String logMessageF = "Request '{}' failed after {}";
try {
final T result = httpClient.execute(request, handler);
logMessageF = "Request '{}' took {}";
return result;
} finally {
log.debug(logMessageF, request.getRequestLine(), formatNanoDuration(System.nanoTime() - start));
}
}
/**
* Get a ready-to-use {@link CloseableHttpClient}.
*/
@Override
public CloseableHttpClient getHttpClient() {
return new ReadLockHttpClient(httpClient);
}
/**
* Execute a request.
*
* <b>ATTENTION</b> Make sure to close the {@link InputStream} in the {@link HttpEntity}, or
* just use {@link #cleanupResponse(HttpResponse)}!
*
* @param request
* @return
* @throws ClientProtocolException
* @throws IOException
* @see #cleanupResponse(HttpResponse)
* @see EntityUtils#consume(HttpEntity)
*/
@Override
public HttpResponse execute(HttpRequestBase request) throws ClientProtocolException, IOException {
final long start = System.nanoTime();
String logMessageF = "Request '{}' failed after {}";
try {
final HttpResponse resp = httpClient.execute(request);
logMessageF = "Request '{}' took {}";
return resp;
} finally {
log.debug(logMessageF, request.getRequestLine(), formatNanoDuration(System.nanoTime() - start));
}
}
@Override
public void cleanupResponse(HttpResponse response) {
if (response == null) throw new IllegalArgumentException("Response must not be null");
EntityUtils.consumeQuietly(response.getEntity());
}
private static String formatNanoDuration(long nano) {
// convert to microseconds (1/1000s)
final long micro = nano / 1000;
if (micro > 1000 * 1000l) {
// more than a second
long millis = micro / 1000l;
if (millis > 60000l)
// more than a minute
return String.format("%d min %.1f sec", millis / (1000 * 60), 0.001d * millis % 60);
else
// more than a second
return String.format("%.1f sec", 0.001d * millis);
} else
// some millis
return String.format("%f ms", 0.001d * micro);
}
@Override
public String doGet(String requestUrl) throws IOException {
return doGet(requestUrl, new StringBodyResponseHandler());
}
@Override
public <T> T doGet(String requestUrl, ResponseHandler<? extends T> responseHandler) throws IOException {
final HttpGet get = new HttpGet(requestUrl);
return execute(get, responseHandler);
}
@Override
public <T> T doPost(String requestUrl, HttpEntity body, ResponseHandler<? extends T> responseHandler) throws IOException {
HttpPost post = new HttpPost(requestUrl);
post.setEntity(body);
return execute(post, responseHandler);
}
@Override
public String doPost(String url, String body) throws IOException {
return doPost(url, new StringEntity(body, ContentType.create("text/plain", DEFAULT_CHARSET)), new StringBodyResponseHandler());
}
@Override
public <T> T doPut(String url, HttpEntity body, ResponseHandler<? extends T> responseHandler) throws IOException {
HttpPut put = new HttpPut(url);
put.setEntity(body);
return execute(put, responseHandler);
}
@Override
public String doPut(String url, String body) throws IOException {
return doPut(url, new StringEntity(body, ContentType.create("text/plain", DEFAULT_CHARSET)), new StringBodyResponseHandler());
}
@Override
public <T> T doDelete(String url, ResponseHandler<? extends T> responseHandler) throws IOException {
HttpDelete delete = new HttpDelete(url);
return execute(delete, responseHandler);
}
@Override
public int doDelete(String url) throws IOException {
return doDelete(url, new StatusCodeResponseHandler());
}
@Override
public <T> T doHead(String url, ResponseHandler<? extends T> responseHandler) throws IOException {
HttpHead head = new HttpHead(url);
return execute(head, responseHandler);
}
@Override
public Date doHead(String url) throws IOException {
return doHead(url, new LastModifiedResponseHandler());
}
protected void onConfigurationChange(@Observes ConfigurationChangedEvent event) {
if (event.containsChangedKeyWithPrefix("core.http.")) {
try {
lock.writeLock().lock();
shutdown();
initialize();
} finally {
lock.writeLock().unlock();
}
}
}
@PostConstruct
protected void initialize() {
try {
lock.writeLock().lock();
final HttpClientBuilder clientBuilder;
if (configurationService.getBooleanConfiguration("core.http.client_cache_enable", true)) {
CacheConfig cacheConfig = CacheConfig.custom()
// FIXME: Hardcoded constants - is this useful?
.setMaxCacheEntries(1000)
.setMaxObjectSize(81920)
.build();
final HttpCacheStorage cacheStore = new EhcacheHttpCacheStorage(ehcache.get(), cacheConfig);
clientBuilder = CachingHttpClientBuilder.create()
.setCacheConfig(cacheConfig)
.setHttpCacheStorage(cacheStore);
} else {
clientBuilder = HttpClients.custom();
}
String userAgentString =
"Apache Marmotta/" + configurationService.getStringConfiguration("kiwi.version") +
" (running at " + configurationService.getServerUri() + ")" +
" lmf-core/" + configurationService.getStringConfiguration("kiwi.version");
clientBuilder.setUserAgent(configurationService.getStringConfiguration("core.http.user_agent", userAgentString));
clientBuilder.setDefaultRequestConfig(RequestConfig.custom()
.setSocketTimeout(configurationService.getIntConfiguration("core.http.so_timeout", 60000))
.setConnectTimeout(configurationService.getIntConfiguration("core.http.connection_timeout", 10000))
.setRedirectsEnabled(true)
.setMaxRedirects(3)
.build());
PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
cm.setMaxTotal(configurationService.getIntConfiguration("core.http.max_connections", 20));
cm.setDefaultMaxPerRoute(configurationService.getIntConfiguration("core.http.max_connections_per_route", 10));
clientBuilder.setConnectionManager(cm);
clientBuilder.setRedirectStrategy(new LMFRedirectStrategy());
clientBuilder.setRetryHandler(new LMFHttpRequestRetryHandler());
this.httpClient = new MonitoredHttpClient(clientBuilder.build());
bytesSent.set(0);
bytesReceived.set(0);
requestsExecuted.set(0);
idleConnectionMonitorThread = new IdleConnectionMonitorThread(cm);
idleConnectionMonitorThread.start();
StatisticsProvider stats = new StatisticsProvider(cm);
statisticsService.registerModule(HttpClientService.class.getSimpleName(), stats);
} finally {
lock.writeLock().unlock();
}
}
@PreDestroy
protected void shutdown() {
try {
lock.writeLock().lock();
statisticsService.unregisterModule(HttpClientService.class.getSimpleName());
idleConnectionMonitorThread.shutdown();
try {
httpClient.close();
} catch (IOException e) {
log.warn("Exception while closing HttpClient: {}", e.getMessage());
}
} finally {
lock.writeLock().unlock();
}
}
private static class LMFRedirectStrategy extends DefaultRedirectStrategy {
@Override
public boolean isRedirected(HttpRequest request, HttpResponse response, HttpContext context) throws ProtocolException {
if (request == null)
throw new IllegalArgumentException("HTTP request must not be null");
if (response == null)
throw new IllegalArgumentException("HTTP response must not be null");
int statusCode = response.getStatusLine().getStatusCode();
String method = request.getRequestLine().getMethod();
Header locationHeader = response.getFirstHeader("location");
switch (statusCode) {
case HttpStatus.SC_MOVED_TEMPORARILY:
return (method.equalsIgnoreCase(HttpGet.METHOD_NAME) || method.equalsIgnoreCase(HttpHead.METHOD_NAME))
&& locationHeader != null;
case HttpStatus.SC_MOVED_PERMANENTLY:
case HttpStatus.SC_TEMPORARY_REDIRECT:
return method.equalsIgnoreCase(HttpGet.METHOD_NAME)
|| method.equalsIgnoreCase(HttpHead.METHOD_NAME);
case HttpStatus.SC_SEE_OTHER:
return true;
case HttpStatus.SC_MULTIPLE_CHOICES:
return true;
default:
return false;
} // end of switch
}
}
private static class LMFHttpRequestRetryHandler implements HttpRequestRetryHandler {
/**
* Determines if a method should be retried after an IOException occurs
* during execution.
*
* @param exception
* the exception that occurred
* @param executionCount
* the number of times this method has been unsuccessfully
* executed
* @param context
* the context for the request execution
* @return <code>true</code> if the method should be retried, <code>false</code> otherwise
*/
@Override
public boolean retryRequest(IOException exception, int executionCount, HttpContext context) {
return false;
}
}
private static class IdleConnectionMonitorThread extends Thread {
private final HttpClientConnectionManager connMgr;
private volatile boolean shutdown;
public IdleConnectionMonitorThread(HttpClientConnectionManager connMgr) {
super("HttpClientService Idle Connection Manager");
this.connMgr = connMgr;
setDaemon(true);
}
@Override
public void run() {
try {
while (!shutdown) {
synchronized (this) {
wait(5000);
// Close expired connections
connMgr.closeExpiredConnections();
// Optionally, close connections
// that have been idle longer than 30 sec
connMgr.closeIdleConnections(30, TimeUnit.SECONDS);
}
}
} catch (InterruptedException ex) {
// terminate
}
}
public void shutdown() {
synchronized (this) {
shutdown = true;
notifyAll();
}
}
}
private class StatisticsProvider implements StatisticsModule {
private boolean enabled;
private PoolingHttpClientConnectionManager connectionManager;
public StatisticsProvider(PoolingHttpClientConnectionManager cm) {
this.connectionManager = cm;
enabled = true;
}
@Override
public void enable() {
enabled = true;
}
@Override
public void disable() {
enabled = false;
}
@Override
public boolean isEnabled() {
return enabled;
}
@Override
public List<String> getPropertyNames() {
return Collections.unmodifiableList(Arrays.asList(KEYS));
}
@Override
public Map<String, String> getStatistics() {
int i = 0;
final Map<String, String> data = new LinkedHashMap<String, String>();
data.put(KEYS[i++], String.valueOf(requestsExecuted.get()));
data.put(KEYS[i++], humanReadableBytes(bytesSent.get(), false));
data.put(KEYS[i++], humanReadableBytes(bytesReceived.get(), false));
data.put(KEYS[i++], humanReadableBytes(bytesFromCache.get(), false));
final PoolStats cmStats = connectionManager.getTotalStats();
data.put(KEYS[i++], connectionManager.getClass().getSimpleName());
data.put(KEYS[i++], String.valueOf(cmStats.getMax()));
data.put(KEYS[i++], String.valueOf(cmStats.getAvailable()));
data.put(KEYS[i++], String.valueOf(cmStats.getLeased()));
data.put(KEYS[i++], String.valueOf(cmStats.getPending()));
return data;
}
private String humanReadableBytes(long bytes, boolean si) {
final int unit = si ? 1000 : 1024;
if (bytes < unit) return bytes + " B";
int exp = (int) (Math.log(bytes) / Math.log(unit));
String pre = (si ? "kMGTPE" : "KMGTPE").charAt(exp - 1) + (si ? "" : "i");
return String.format("%.2f %sB", bytes / Math.pow(unit, exp), pre);
}
@Override
public String getName() {
return HttpClientService.class.getSimpleName();
}
}
private class ReadLockHttpClient extends CloseableHttpClient {
private final CloseableHttpClient delegate;
public ReadLockHttpClient(CloseableHttpClient delegate) {
this.delegate = delegate;
}
@Override
@Deprecated
public HttpParams getParams() {
return delegate.getParams();
}
@Override
@Deprecated
public ClientConnectionManager getConnectionManager() {
return delegate.getConnectionManager();
}
@Override
public void close() throws IOException {
// Nop; this Client is read only!
}
@Override
protected CloseableHttpResponse doExecute(HttpHost target,
HttpRequest request, HttpContext context) throws IOException,
ClientProtocolException {
lock.readLock().lock();
try {
return delegate.execute(target, request, context);
} finally {
lock.readLock().lock();
}
}
}
protected class MonitoredHttpClient extends CloseableHttpClient {
private final CloseableHttpClient delegate;
public MonitoredHttpClient(CloseableHttpClient delegate) {
this.delegate = delegate;
}
private Task preProcess(HttpRequest request) {
final RequestLine rl = request.getRequestLine();
final String taskName = String.format("%S %s %S", rl.getMethod(), rl.getUri(), request.getProtocolVersion());
final Task task = taskManagerService.createSubTask(taskName, TASK_GROUP_CLIENT);
task.updateMessage("preparing request");
task.updateDetailMessage("method", rl.getMethod());
task.updateDetailMessage("url", rl.getUri());
// TODO: some more detail messages?
if (request instanceof HttpEntityEnclosingRequest) {
// To report upload progress, the entity is wrapped in a MonitoredHttpEntity.
final HttpEntityEnclosingRequest entityRequest = (HttpEntityEnclosingRequest) request;
entityRequest.setEntity(new MonitoredHttpEntity(entityRequest.getEntity(), task, bytesSent));
}
task.updateMessage("sending request");
return task;
}
private CloseableHttpResponse postProcess(final CloseableHttpResponse response, HttpContext context, final Task task) {
requestsExecuted.incrementAndGet();
task.resetProgress();
task.updateMessage("retrieving response");
if (response.getEntity() != null) {
boolean cachedResponse;
if (context != null && context instanceof HttpCacheContext) {
HttpCacheContext cacheContext = (HttpCacheContext) context;
CacheResponseStatus cacheRespStatus = cacheContext.getCacheResponseStatus();
// To report download progress, the entity is wrapped in a MonitoredHttpEntity.
cachedResponse = cacheRespStatus != null && cacheRespStatus != CacheResponseStatus.CACHE_MISS;
} else {
cachedResponse = false;
}
response.setEntity(new MonitoredHttpEntity(response.getEntity(), task, cachedResponse ? bytesFromCache : bytesReceived));
} else {
task.endTask();
}
return response;
}
@Override
@Deprecated
public HttpParams getParams() {
return delegate.getParams();
}
@Override
@Deprecated
public ClientConnectionManager getConnectionManager() {
return delegate.getConnectionManager();
}
@Override
public void close() throws IOException {
delegate.close();
}
@Override
protected CloseableHttpResponse doExecute(HttpHost target, HttpRequest request, HttpContext context) throws IOException, ClientProtocolException {
final Task task = preProcess(request);
lock.readLock().lock();
try {
return postProcess(delegate.execute(target, request, context), context, task);
} catch (ClientProtocolException cpe) {
task.endTask();
throw cpe;
} catch (IOException io) {
task.endTask();
throw io;
} finally {
lock.readLock().unlock();
}
}
}
}