blob: 2ccae573097c7c400acb93d5a9f2785fb23ca24d [file] [log] [blame]
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.impl.cache;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hc.client5.http.HttpRoute;
import org.apache.hc.client5.http.async.AsyncExecCallback;
import org.apache.hc.client5.http.async.AsyncExecChain;
import org.apache.hc.client5.http.async.AsyncExecChainHandler;
import org.apache.hc.client5.http.async.methods.SimpleBody;
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
import org.apache.hc.client5.http.cache.CacheResponseStatus;
import org.apache.hc.client5.http.cache.HeaderConstants;
import org.apache.hc.client5.http.cache.HttpAsyncCacheStorage;
import org.apache.hc.client5.http.cache.HttpCacheEntry;
import org.apache.hc.client5.http.cache.ResourceFactory;
import org.apache.hc.client5.http.cache.ResourceIOException;
import org.apache.hc.client5.http.impl.ExecSupport;
import org.apache.hc.client5.http.impl.RequestCopier;
import org.apache.hc.client5.http.protocol.HttpClientContext;
import org.apache.hc.client5.http.schedule.SchedulingStrategy;
import org.apache.hc.client5.http.utils.DateUtils;
import org.apache.hc.core5.annotation.Contract;
import org.apache.hc.core5.annotation.ThreadingBehavior;
import org.apache.hc.core5.concurrent.CancellableDependency;
import org.apache.hc.core5.concurrent.ComplexFuture;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.EntityDetails;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpHeaders;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.HttpStatus;
import org.apache.hc.core5.http.impl.BasicEntityDetails;
import org.apache.hc.core5.http.nio.AsyncDataConsumer;
import org.apache.hc.core5.http.nio.AsyncEntityProducer;
import org.apache.hc.core5.http.nio.CapacityChannel;
import org.apache.hc.core5.http.protocol.HttpCoreContext;
import org.apache.hc.core5.net.URIAuthority;
import org.apache.hc.core5.util.Args;
import org.apache.hc.core5.util.ByteArrayBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Request executor in the request execution chain that is responsible for
* transparent client-side caching.
* <p>
* The current implementation is conditionally
* compliant with HTTP/1.1 (meaning all the MUST and MUST NOTs are obeyed),
* although quite a lot, though not all, of the SHOULDs and SHOULD NOTs
* are obeyed too.
*
* @since 5.0
*/
@Contract(threading = ThreadingBehavior.SAFE) // So long as the responseCache implementation is threadsafe
class AsyncCachingExec extends CachingExecBase implements AsyncExecChainHandler {
private static final Logger LOG = LoggerFactory.getLogger(AsyncCachingExec.class);
private final HttpAsyncCache responseCache;
private final DefaultAsyncCacheRevalidator cacheRevalidator;
private final ConditionalRequestBuilder<HttpRequest> conditionalRequestBuilder;
AsyncCachingExec(final HttpAsyncCache cache, final DefaultAsyncCacheRevalidator cacheRevalidator, final CacheConfig config) {
super(config);
this.responseCache = Args.notNull(cache, "Response cache");
this.cacheRevalidator = cacheRevalidator;
this.conditionalRequestBuilder = new ConditionalRequestBuilder<>(RequestCopier.INSTANCE);
}
AsyncCachingExec(
final HttpAsyncCache responseCache,
final CacheValidityPolicy validityPolicy,
final ResponseCachingPolicy responseCachingPolicy,
final CachedHttpResponseGenerator responseGenerator,
final CacheableRequestPolicy cacheableRequestPolicy,
final CachedResponseSuitabilityChecker suitabilityChecker,
final ResponseProtocolCompliance responseCompliance,
final RequestProtocolCompliance requestCompliance,
final DefaultAsyncCacheRevalidator cacheRevalidator,
final ConditionalRequestBuilder<HttpRequest> conditionalRequestBuilder,
final CacheConfig config) {
super(validityPolicy, responseCachingPolicy, responseGenerator, cacheableRequestPolicy,
suitabilityChecker, responseCompliance, requestCompliance, config);
this.responseCache = responseCache;
this.cacheRevalidator = cacheRevalidator;
this.conditionalRequestBuilder = conditionalRequestBuilder;
}
AsyncCachingExec(
final HttpAsyncCache cache,
final ScheduledExecutorService executorService,
final SchedulingStrategy schedulingStrategy,
final CacheConfig config) {
this(cache,
executorService != null ? new DefaultAsyncCacheRevalidator(executorService, schedulingStrategy) : null,
config);
}
AsyncCachingExec(
final ResourceFactory resourceFactory,
final HttpAsyncCacheStorage storage,
final ScheduledExecutorService executorService,
final SchedulingStrategy schedulingStrategy,
final CacheConfig config) {
this(new BasicHttpAsyncCache(resourceFactory, storage), executorService, schedulingStrategy, config);
}
private void triggerResponse(
final SimpleHttpResponse cacheResponse,
final AsyncExecChain.Scope scope,
final AsyncExecCallback asyncExecCallback) {
scope.clientContext.setAttribute(HttpCoreContext.HTTP_RESPONSE, cacheResponse);
scope.execRuntime.releaseEndpoint();
final SimpleBody body = cacheResponse.getBody();
final byte[] content = body != null ? body.getBodyBytes() : null;
final ContentType contentType = body != null ? body.getContentType() : null;
try {
final AsyncDataConsumer dataConsumer = asyncExecCallback.handleResponse(
cacheResponse,
content != null ? new BasicEntityDetails(content.length, contentType) : null);
if (dataConsumer != null) {
if (content != null) {
dataConsumer.consume(ByteBuffer.wrap(content));
}
dataConsumer.streamEnd(null);
}
asyncExecCallback.completed();
} catch (final HttpException | IOException ex) {
asyncExecCallback.failed(ex);
}
}
static class AsyncExecCallbackWrapper implements AsyncExecCallback {
private final AsyncExecCallback asyncExecCallback;
private final Runnable command;
AsyncExecCallbackWrapper(final AsyncExecCallback asyncExecCallback, final Runnable command) {
this.asyncExecCallback = asyncExecCallback;
this.command = command;
}
@Override
public AsyncDataConsumer handleResponse(
final HttpResponse response,
final EntityDetails entityDetails) throws HttpException, IOException {
return null;
}
@Override
public void handleInformationResponse(final HttpResponse response) throws HttpException, IOException {
}
@Override
public void completed() {
command.run();
}
@Override
public void failed(final Exception cause) {
asyncExecCallback.failed(cause);
}
}
@Override
public void execute(
final HttpRequest request,
final AsyncEntityProducer entityProducer,
final AsyncExecChain.Scope scope,
final AsyncExecChain chain,
final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {
Args.notNull(request, "HTTP request");
Args.notNull(scope, "Scope");
final HttpRoute route = scope.route;
final CancellableDependency operation = scope.cancellableDependency;
final HttpClientContext context = scope.clientContext;
context.setAttribute(HttpClientContext.HTTP_ROUTE, route);
context.setAttribute(HttpClientContext.HTTP_REQUEST, request);
final URIAuthority authority = request.getAuthority();
final String scheme = request.getScheme();
final HttpHost target = authority != null ? new HttpHost(scheme, authority) : route.getTargetHost();
final String via = generateViaHeader(request);
// default response context
setResponseStatus(context, CacheResponseStatus.CACHE_MISS);
if (clientRequestsOurOptions(request)) {
setResponseStatus(context, CacheResponseStatus.CACHE_MODULE_RESPONSE);
triggerResponse(SimpleHttpResponse.create(HttpStatus.SC_NOT_IMPLEMENTED), scope, asyncExecCallback);
return;
}
final SimpleHttpResponse fatalErrorResponse = getFatallyNoncompliantResponse(request, context);
if (fatalErrorResponse != null) {
triggerResponse(fatalErrorResponse, scope, asyncExecCallback);
return;
}
requestCompliance.makeRequestCompliant(request);
request.addHeader("Via",via);
if (!cacheableRequestPolicy.isServableFromCache(request)) {
LOG.debug("Request is not servable from cache");
operation.setDependency(responseCache.flushCacheEntriesInvalidatedByRequest(target, request, new FutureCallback<Boolean>() {
@Override
public void completed(final Boolean result) {
callBackend(target, request, entityProducer, scope, chain, asyncExecCallback);
}
@Override
public void failed(final Exception cause) {
asyncExecCallback.failed(cause);
}
@Override
public void cancelled() {
asyncExecCallback.failed(new InterruptedIOException());
}
}));
} else {
operation.setDependency(responseCache.getCacheEntry(target, request, new FutureCallback<HttpCacheEntry>() {
@Override
public void completed(final HttpCacheEntry entry) {
if (entry == null) {
LOG.debug("Cache miss");
handleCacheMiss(target, request, entityProducer, scope, chain, asyncExecCallback);
} else {
handleCacheHit(target, request, entityProducer, scope, chain, asyncExecCallback, entry);
}
}
@Override
public void failed(final Exception cause) {
asyncExecCallback.failed(cause);
}
@Override
public void cancelled() {
asyncExecCallback.failed(new InterruptedIOException());
}
}));
}
}
void chainProceed(
final HttpRequest request,
final AsyncEntityProducer entityProducer,
final AsyncExecChain.Scope scope,
final AsyncExecChain chain,
final AsyncExecCallback asyncExecCallback) {
try {
chain.proceed(request, entityProducer, scope, asyncExecCallback);
} catch (final HttpException | IOException ex) {
asyncExecCallback.failed(ex);
}
}
void callBackend(
final HttpHost target,
final HttpRequest request,
final AsyncEntityProducer entityProducer,
final AsyncExecChain.Scope scope,
final AsyncExecChain chain,
final AsyncExecCallback asyncExecCallback) {
LOG.debug("Calling the backend");
final Date requestDate = getCurrentDate();
final AtomicReference<AsyncExecCallback> callbackRef = new AtomicReference<>();
chainProceed(request, entityProducer, scope, chain, new AsyncExecCallback() {
@Override
public AsyncDataConsumer handleResponse(
final HttpResponse backendResponse,
final EntityDetails entityDetails) throws HttpException, IOException {
final Date responseDate = getCurrentDate();
backendResponse.addHeader("Via", generateViaHeader(backendResponse));
final AsyncExecCallback callback = new BackendResponseHandler(target, request, requestDate, responseDate, scope, asyncExecCallback);
callbackRef.set(callback);
return callback.handleResponse(backendResponse, entityDetails);
}
@Override
public void handleInformationResponse(final HttpResponse response) throws HttpException, IOException {
final AsyncExecCallback callback = callbackRef.getAndSet(null);
if (callback != null) {
callback.handleInformationResponse(response);
} else {
asyncExecCallback.handleInformationResponse(response);
}
}
@Override
public void completed() {
final AsyncExecCallback callback = callbackRef.getAndSet(null);
if (callback != null) {
callback.completed();
} else {
asyncExecCallback.completed();
}
}
@Override
public void failed(final Exception cause) {
final AsyncExecCallback callback = callbackRef.getAndSet(null);
if (callback != null) {
callback.failed(cause);
} else {
asyncExecCallback.failed(cause);
}
}
});
}
class CachingAsyncDataConsumer implements AsyncDataConsumer {
private final AsyncExecCallback fallback;
private final HttpResponse backendResponse;
private final EntityDetails entityDetails;
private final AtomicBoolean writtenThrough;
private final AtomicReference<ByteArrayBuffer> bufferRef;
private final AtomicReference<AsyncDataConsumer> dataConsumerRef;
CachingAsyncDataConsumer(
final AsyncExecCallback fallback,
final HttpResponse backendResponse,
final EntityDetails entityDetails) {
this.fallback = fallback;
this.backendResponse = backendResponse;
this.entityDetails = entityDetails;
this.writtenThrough = new AtomicBoolean(false);
this.bufferRef = new AtomicReference<>(entityDetails != null ? new ByteArrayBuffer(1024) : null);
this.dataConsumerRef = new AtomicReference<>();
}
@Override
public final void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
final AsyncDataConsumer dataConsumer = dataConsumerRef.get();
if (dataConsumer != null) {
dataConsumer.updateCapacity(capacityChannel);
} else {
capacityChannel.update(Integer.MAX_VALUE);
}
}
@Override
public final void consume(final ByteBuffer src) throws IOException {
final ByteArrayBuffer buffer = bufferRef.get();
if (buffer != null) {
if (src.hasArray()) {
buffer.append(src.array(), src.arrayOffset() + src.position(), src.remaining());
} else {
while (src.hasRemaining()) {
buffer.append(src.get());
}
}
if (buffer.length() > cacheConfig.getMaxObjectSize()) {
LOG.debug("Backend response content length exceeds maximum");
// Over the max limit. Stop buffering and forward the response
// along with all the data buffered so far to the caller.
bufferRef.set(null);
try {
final AsyncDataConsumer dataConsumer = fallback.handleResponse(backendResponse, entityDetails);
if (dataConsumer != null) {
dataConsumerRef.set(dataConsumer);
writtenThrough.set(true);
dataConsumer.consume(ByteBuffer.wrap(buffer.array(), 0, buffer.length()));
}
} catch (final HttpException ex) {
fallback.failed(ex);
}
}
} else {
final AsyncDataConsumer dataConsumer = dataConsumerRef.get();
if (dataConsumer != null) {
dataConsumer.consume(src);
}
}
}
@Override
public final void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
final AsyncDataConsumer dataConsumer = dataConsumerRef.getAndSet(null);
if (dataConsumer != null) {
dataConsumer.streamEnd(trailers);
}
}
@Override
public void releaseResources() {
final AsyncDataConsumer dataConsumer = dataConsumerRef.getAndSet(null);
if (dataConsumer != null) {
dataConsumer.releaseResources();
}
}
}
class BackendResponseHandler implements AsyncExecCallback {
private final HttpHost target;
private final HttpRequest request;
private final Date requestDate;
private final Date responseDate;
private final AsyncExecChain.Scope scope;
private final AsyncExecCallback asyncExecCallback;
private final AtomicReference<CachingAsyncDataConsumer> cachingConsumerRef;
BackendResponseHandler(
final HttpHost target,
final HttpRequest request,
final Date requestDate,
final Date responseDate,
final AsyncExecChain.Scope scope,
final AsyncExecCallback asyncExecCallback) {
this.target = target;
this.request = request;
this.requestDate = requestDate;
this.responseDate = responseDate;
this.scope = scope;
this.asyncExecCallback = asyncExecCallback;
this.cachingConsumerRef = new AtomicReference<>();
}
@Override
public AsyncDataConsumer handleResponse(
final HttpResponse backendResponse,
final EntityDetails entityDetails) throws HttpException, IOException {
responseCompliance.ensureProtocolCompliance(scope.originalRequest, request, backendResponse);
responseCache.flushCacheEntriesInvalidatedByExchange(target, request, backendResponse, new FutureCallback<Boolean>() {
@Override
public void completed(final Boolean result) {
}
@Override
public void failed(final Exception ex) {
LOG.warn("Unable to flush invalidated entries from cache", ex);
}
@Override
public void cancelled() {
}
});
final boolean cacheable = responseCachingPolicy.isResponseCacheable(request, backendResponse);
if (cacheable) {
cachingConsumerRef.set(new CachingAsyncDataConsumer(asyncExecCallback, backendResponse, entityDetails));
storeRequestIfModifiedSinceFor304Response(request, backendResponse);
} else {
LOG.debug("Backend response is not cacheable");
responseCache.flushCacheEntriesFor(target, request, new FutureCallback<Boolean>() {
@Override
public void completed(final Boolean result) {
}
@Override
public void failed(final Exception ex) {
LOG.warn("Unable to flush invalidated entries from cache", ex);
}
@Override
public void cancelled() {
}
});
}
final CachingAsyncDataConsumer cachingDataConsumer = cachingConsumerRef.get();
if (cachingDataConsumer != null) {
LOG.debug("Caching backend response");
return cachingDataConsumer;
}
return asyncExecCallback.handleResponse(backendResponse, entityDetails);
}
@Override
public void handleInformationResponse(final HttpResponse response) throws HttpException, IOException {
asyncExecCallback.handleInformationResponse(response);
}
void triggerNewCacheEntryResponse(final HttpResponse backendResponse, final Date responseDate, final ByteArrayBuffer buffer) {
final CancellableDependency operation = scope.cancellableDependency;
operation.setDependency(responseCache.createCacheEntry(
target,
request,
backendResponse,
buffer,
requestDate,
responseDate,
new FutureCallback<HttpCacheEntry>() {
@Override
public void completed(final HttpCacheEntry newEntry) {
LOG.debug("Backend response successfully cached");
try {
final SimpleHttpResponse cacheResponse = responseGenerator.generateResponse(request, newEntry);
triggerResponse(cacheResponse, scope, asyncExecCallback);
} catch (final ResourceIOException ex) {
asyncExecCallback.failed(ex);
}
}
@Override
public void failed(final Exception ex) {
asyncExecCallback.failed(ex);
}
@Override
public void cancelled() {
asyncExecCallback.failed(new InterruptedIOException());
}
}));
}
@Override
public void completed() {
final CachingAsyncDataConsumer cachingDataConsumer = cachingConsumerRef.getAndSet(null);
if (cachingDataConsumer != null && !cachingDataConsumer.writtenThrough.get()) {
final ByteArrayBuffer buffer = cachingDataConsumer.bufferRef.getAndSet(null);
final HttpResponse backendResponse = cachingDataConsumer.backendResponse;
if (cacheConfig.isFreshnessCheckEnabled()) {
final CancellableDependency operation = scope.cancellableDependency;
operation.setDependency(responseCache.getCacheEntry(target, request, new FutureCallback<HttpCacheEntry>() {
@Override
public void completed(final HttpCacheEntry existingEntry) {
if (DateUtils.isAfter(existingEntry, backendResponse, HttpHeaders.DATE)) {
LOG.debug("Backend already contains fresher cache entry");
try {
final SimpleHttpResponse cacheResponse = responseGenerator.generateResponse(request, existingEntry);
triggerResponse(cacheResponse, scope, asyncExecCallback);
} catch (final ResourceIOException ex) {
asyncExecCallback.failed(ex);
}
} else {
triggerNewCacheEntryResponse(backendResponse, responseDate, buffer);
}
}
@Override
public void failed(final Exception cause) {
asyncExecCallback.failed(cause);
}
@Override
public void cancelled() {
asyncExecCallback.failed(new InterruptedIOException());
}
}));
} else {
triggerNewCacheEntryResponse(backendResponse, responseDate, buffer);
}
} else {
asyncExecCallback.completed();
}
}
@Override
public void failed(final Exception cause) {
asyncExecCallback.failed(cause);
}
}
private void handleCacheHit(
final HttpHost target,
final HttpRequest request,
final AsyncEntityProducer entityProducer,
final AsyncExecChain.Scope scope,
final AsyncExecChain chain,
final AsyncExecCallback asyncExecCallback,
final HttpCacheEntry entry) {
final HttpClientContext context = scope.clientContext;
recordCacheHit(target, request);
final Date now = getCurrentDate();
if (suitabilityChecker.canCachedResponseBeUsed(target, request, entry, now)) {
LOG.debug("Cache hit");
try {
final SimpleHttpResponse cacheResponse = generateCachedResponse(request, context, entry, now);
triggerResponse(cacheResponse, scope, asyncExecCallback);
} catch (final ResourceIOException ex) {
recordCacheFailure(target, request);
if (!mayCallBackend(request)) {
final SimpleHttpResponse cacheResponse = generateGatewayTimeout(context);
triggerResponse(cacheResponse, scope, asyncExecCallback);
} else {
setResponseStatus(scope.clientContext, CacheResponseStatus.FAILURE);
try {
chain.proceed(request, entityProducer, scope, asyncExecCallback);
} catch (final HttpException | IOException ex2) {
asyncExecCallback.failed(ex2);
}
}
}
} else if (!mayCallBackend(request)) {
LOG.debug("Cache entry not suitable but only-if-cached requested");
final SimpleHttpResponse cacheResponse = generateGatewayTimeout(context);
triggerResponse(cacheResponse, scope, asyncExecCallback);
} else if (!(entry.getStatus() == HttpStatus.SC_NOT_MODIFIED && !suitabilityChecker.isConditional(request))) {
LOG.debug("Revalidating cache entry");
if (cacheRevalidator != null
&& !staleResponseNotAllowed(request, entry, now)
&& validityPolicy.mayReturnStaleWhileRevalidating(entry, now)) {
LOG.debug("Serving stale with asynchronous revalidation");
try {
final SimpleHttpResponse cacheResponse = generateCachedResponse(request, context, entry, now);
final String exchangeId = ExecSupport.getNextExchangeId();
final AsyncExecChain.Scope fork = new AsyncExecChain.Scope(
exchangeId,
scope.route,
scope.originalRequest,
new ComplexFuture<>(null),
HttpClientContext.create(),
scope.execRuntime.fork());
cacheRevalidator.revalidateCacheEntry(
responseCache.generateKey(target, request, entry),
asyncExecCallback,
new DefaultAsyncCacheRevalidator.RevalidationCall() {
@Override
public void execute(final AsyncExecCallback asyncExecCallback) {
revalidateCacheEntry(target, request, entityProducer, fork, chain, asyncExecCallback, entry);
}
});
triggerResponse(cacheResponse, scope, asyncExecCallback);
} catch (final ResourceIOException ex) {
asyncExecCallback.failed(ex);
}
} else {
revalidateCacheEntry(target, request, entityProducer, scope, chain, asyncExecCallback, entry);
}
} else {
LOG.debug("Cache entry not usable; calling backend");
callBackend(target, request, entityProducer, scope, chain, asyncExecCallback);
}
}
void revalidateCacheEntry(
final HttpHost target,
final HttpRequest request,
final AsyncEntityProducer entityProducer,
final AsyncExecChain.Scope scope,
final AsyncExecChain chain,
final AsyncExecCallback asyncExecCallback,
final HttpCacheEntry cacheEntry) {
final Date requestDate = getCurrentDate();
final HttpRequest conditionalRequest = conditionalRequestBuilder.buildConditionalRequest(scope.originalRequest, cacheEntry);
chainProceed(conditionalRequest, entityProducer, scope, chain, new AsyncExecCallback() {
final AtomicReference<AsyncExecCallback> callbackRef = new AtomicReference<>();
void triggerUpdatedCacheEntryResponse(final HttpResponse backendResponse, final Date responseDate) {
final CancellableDependency operation = scope.cancellableDependency;
recordCacheUpdate(scope.clientContext);
operation.setDependency(responseCache.updateCacheEntry(
target,
request,
cacheEntry,
backendResponse,
requestDate,
responseDate,
new FutureCallback<HttpCacheEntry>() {
@Override
public void completed(final HttpCacheEntry updatedEntry) {
if (suitabilityChecker.isConditional(request)
&& suitabilityChecker.allConditionalsMatch(request, updatedEntry, new Date())) {
final SimpleHttpResponse cacheResponse = responseGenerator.generateNotModifiedResponse(updatedEntry);
triggerResponse(cacheResponse, scope, asyncExecCallback);
} else {
try {
final SimpleHttpResponse cacheResponse = responseGenerator.generateResponse(request, updatedEntry);
triggerResponse(cacheResponse, scope, asyncExecCallback);
} catch (final ResourceIOException ex) {
asyncExecCallback.failed(ex);
}
}
}
@Override
public void failed(final Exception ex) {
asyncExecCallback.failed(ex);
}
@Override
public void cancelled() {
asyncExecCallback.failed(new InterruptedIOException());
}
}));
}
void triggerResponseStaleCacheEntry() {
try {
final SimpleHttpResponse cacheResponse = responseGenerator.generateResponse(request, cacheEntry);
cacheResponse.addHeader(HeaderConstants.WARNING, "110 localhost \"Response is stale\"");
triggerResponse(cacheResponse, scope, asyncExecCallback);
} catch (final ResourceIOException ex) {
asyncExecCallback.failed(ex);
}
}
AsyncExecCallback evaluateResponse(final HttpResponse backendResponse, final Date responseDate) {
backendResponse.addHeader(HeaderConstants.VIA, generateViaHeader(backendResponse));
final int statusCode = backendResponse.getCode();
if (statusCode == HttpStatus.SC_NOT_MODIFIED || statusCode == HttpStatus.SC_OK) {
recordCacheUpdate(scope.clientContext);
}
if (statusCode == HttpStatus.SC_NOT_MODIFIED) {
return new AsyncExecCallbackWrapper(asyncExecCallback, new Runnable() {
@Override
public void run() {
triggerUpdatedCacheEntryResponse(backendResponse, responseDate);
}
});
}
if (staleIfErrorAppliesTo(statusCode)
&& !staleResponseNotAllowed(request, cacheEntry, getCurrentDate())
&& validityPolicy.mayReturnStaleIfError(request, cacheEntry, responseDate)) {
return new AsyncExecCallbackWrapper(asyncExecCallback, new Runnable() {
@Override
public void run() {
triggerResponseStaleCacheEntry();
}
});
}
return new BackendResponseHandler(target, conditionalRequest, requestDate, responseDate, scope, asyncExecCallback);
}
@Override
public AsyncDataConsumer handleResponse(
final HttpResponse backendResponse1,
final EntityDetails entityDetails) throws HttpException, IOException {
final Date responseDate1 = getCurrentDate();
final AsyncExecCallback callback1;
if (revalidationResponseIsTooOld(backendResponse1, cacheEntry)
&& (entityProducer == null || entityProducer.isRepeatable())) {
final HttpRequest unconditional = conditionalRequestBuilder.buildUnconditionalRequest(
scope.originalRequest);
callback1 = new AsyncExecCallbackWrapper(asyncExecCallback, new Runnable() {
@Override
public void run() {
chainProceed(unconditional, entityProducer, scope, chain, new AsyncExecCallback() {
@Override
public AsyncDataConsumer handleResponse(
final HttpResponse backendResponse2,
final EntityDetails entityDetails) throws HttpException, IOException {
final Date responseDate2 = getCurrentDate();
final AsyncExecCallback callback2 = evaluateResponse(backendResponse2, responseDate2);
callbackRef.set(callback2);
return callback2.handleResponse(backendResponse2, entityDetails);
}
@Override
public void handleInformationResponse(final HttpResponse response) throws HttpException, IOException {
final AsyncExecCallback callback2 = callbackRef.getAndSet(null);
if (callback2 != null) {
callback2.handleInformationResponse(response);
} else {
asyncExecCallback.handleInformationResponse(response);
}
}
@Override
public void completed() {
final AsyncExecCallback callback2 = callbackRef.getAndSet(null);
if (callback2 != null) {
callback2.completed();
} else {
asyncExecCallback.completed();
}
}
@Override
public void failed(final Exception cause) {
final AsyncExecCallback callback2 = callbackRef.getAndSet(null);
if (callback2 != null) {
callback2.failed(cause);
} else {
asyncExecCallback.failed(cause);
}
}
});
}
});
} else {
callback1 = evaluateResponse(backendResponse1, responseDate1);
}
callbackRef.set(callback1);
return callback1.handleResponse(backendResponse1, entityDetails);
}
@Override
public void handleInformationResponse(final HttpResponse response) throws HttpException, IOException {
final AsyncExecCallback callback1 = callbackRef.getAndSet(null);
if (callback1 != null) {
callback1.handleInformationResponse(response);
} else {
asyncExecCallback.handleInformationResponse(response);
}
}
@Override
public void completed() {
final AsyncExecCallback callback1 = callbackRef.getAndSet(null);
if (callback1 != null) {
callback1.completed();
} else {
asyncExecCallback.completed();
}
}
@Override
public void failed(final Exception cause) {
final AsyncExecCallback callback1 = callbackRef.getAndSet(null);
if (callback1 != null) {
callback1.failed(cause);
} else {
asyncExecCallback.failed(cause);
}
}
});
}
private void handleCacheMiss(
final HttpHost target,
final HttpRequest request,
final AsyncEntityProducer entityProducer,
final AsyncExecChain.Scope scope,
final AsyncExecChain chain,
final AsyncExecCallback asyncExecCallback) {
recordCacheMiss(target, request);
if (mayCallBackend(request)) {
final CancellableDependency operation = scope.cancellableDependency;
operation.setDependency(responseCache.getVariantCacheEntriesWithEtags(
target,
request,
new FutureCallback<Map<String, Variant>>() {
@Override
public void completed(final Map<String, Variant> variants) {
if (variants != null && !variants.isEmpty() && (entityProducer == null || entityProducer.isRepeatable())) {
negotiateResponseFromVariants(target, request, entityProducer, scope, chain, asyncExecCallback, variants);
} else {
callBackend(target, request, entityProducer, scope, chain, asyncExecCallback);
}
}
@Override
public void failed(final Exception ex) {
asyncExecCallback.failed(ex);
}
@Override
public void cancelled() {
asyncExecCallback.failed(new InterruptedIOException());
}
}));
} else {
final SimpleHttpResponse cacheResponse = SimpleHttpResponse.create(HttpStatus.SC_GATEWAY_TIMEOUT, "Gateway Timeout");
triggerResponse(cacheResponse, scope, asyncExecCallback);
}
}
void negotiateResponseFromVariants(
final HttpHost target,
final HttpRequest request,
final AsyncEntityProducer entityProducer,
final AsyncExecChain.Scope scope,
final AsyncExecChain chain,
final AsyncExecCallback asyncExecCallback,
final Map<String, Variant> variants) {
final CancellableDependency operation = scope.cancellableDependency;
final HttpRequest conditionalRequest = conditionalRequestBuilder.buildConditionalRequestFromVariants(request, variants);
final Date requestDate = getCurrentDate();
chainProceed(conditionalRequest, entityProducer, scope, chain, new AsyncExecCallback() {
final AtomicReference<AsyncExecCallback> callbackRef = new AtomicReference<>();
void updateVariantCacheEntry(final HttpResponse backendResponse, final Date responseDate, final Variant matchingVariant) {
recordCacheUpdate(scope.clientContext);
operation.setDependency(responseCache.updateVariantCacheEntry(
target,
conditionalRequest,
backendResponse,
matchingVariant,
requestDate,
responseDate,
new FutureCallback<HttpCacheEntry>() {
@Override
public void completed(final HttpCacheEntry responseEntry) {
if (shouldSendNotModifiedResponse(request, responseEntry)) {
final SimpleHttpResponse cacheResponse = responseGenerator.generateNotModifiedResponse(responseEntry);
triggerResponse(cacheResponse, scope, asyncExecCallback);
} else {
try {
final SimpleHttpResponse cacheResponse = responseGenerator.generateResponse(request, responseEntry);
operation.setDependency(responseCache.reuseVariantEntryFor(
target,
request,
matchingVariant,
new FutureCallback<Boolean>() {
@Override
public void completed(final Boolean result) {
triggerResponse(cacheResponse, scope, asyncExecCallback);
}
@Override
public void failed(final Exception ex) {
asyncExecCallback.failed(ex);
}
@Override
public void cancelled() {
asyncExecCallback.failed(new InterruptedIOException());
}
}));
} catch (final ResourceIOException ex) {
asyncExecCallback.failed(ex);
}
}
}
@Override
public void failed(final Exception ex) {
asyncExecCallback.failed(ex);
}
@Override
public void cancelled() {
asyncExecCallback.failed(new InterruptedIOException());
}
}));
}
@Override
public AsyncDataConsumer handleResponse(
final HttpResponse backendResponse,
final EntityDetails entityDetails) throws HttpException, IOException {
final Date responseDate = getCurrentDate();
backendResponse.addHeader("Via", generateViaHeader(backendResponse));
final AsyncExecCallback callback;
if (backendResponse.getCode() != HttpStatus.SC_NOT_MODIFIED) {
callback = new BackendResponseHandler(target, request, requestDate, responseDate, scope, asyncExecCallback);
} else {
final Header resultEtagHeader = backendResponse.getFirstHeader(HeaderConstants.ETAG);
if (resultEtagHeader == null) {
LOG.warn("304 response did not contain ETag");
callback = new AsyncExecCallbackWrapper(asyncExecCallback, new Runnable() {
@Override
public void run() {
callBackend(target, request, entityProducer, scope, chain, asyncExecCallback);
}
});
} else {
final String resultEtag = resultEtagHeader.getValue();
final Variant matchingVariant = variants.get(resultEtag);
if (matchingVariant == null) {
LOG.debug("304 response did not contain ETag matching one sent in If-None-Match");
callback = new AsyncExecCallbackWrapper(asyncExecCallback, new Runnable() {
@Override
public void run() {
callBackend(target, request, entityProducer, scope, chain, asyncExecCallback);
}
});
} else {
if (revalidationResponseIsTooOld(backendResponse, matchingVariant.getEntry())) {
final HttpRequest unconditional = conditionalRequestBuilder.buildUnconditionalRequest(request);
scope.clientContext.setAttribute(HttpCoreContext.HTTP_REQUEST, unconditional);
callback = new AsyncExecCallbackWrapper(asyncExecCallback, new Runnable() {
@Override
public void run() {
callBackend(target, request, entityProducer, scope, chain, asyncExecCallback);
}
});
} else {
callback = new AsyncExecCallbackWrapper(asyncExecCallback, new Runnable() {
@Override
public void run() {
updateVariantCacheEntry(backendResponse, responseDate, matchingVariant);
}
});
}
}
}
}
callbackRef.set(callback);
return callback.handleResponse(backendResponse, entityDetails);
}
@Override
public void handleInformationResponse(final HttpResponse response) throws HttpException, IOException {
final AsyncExecCallback callback = callbackRef.getAndSet(null);
if (callback != null) {
callback.handleInformationResponse(response);
} else {
asyncExecCallback.handleInformationResponse(response);
}
}
@Override
public void completed() {
final AsyncExecCallback callback = callbackRef.getAndSet(null);
if (callback != null) {
callback.completed();
} else {
asyncExecCallback.completed();
}
}
@Override
public void failed(final Exception cause) {
final AsyncExecCallback callback = callbackRef.getAndSet(null);
if (callback != null) {
callback.failed(cause);
} else {
asyncExecCallback.failed(cause);
}
}
});
}
}