| /* |
| * ==================================================================== |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * ==================================================================== |
| * |
| * This software consists of voluntary contributions made by many |
| * individuals on behalf of the Apache Software Foundation. For more |
| * information on the Apache Software Foundation, please see |
| * <http://www.apache.org/>. |
| * |
| */ |
| package org.apache.hc.client5.http.impl.cache; |
| |
| import java.io.IOException; |
| import java.io.InterruptedIOException; |
| import java.nio.ByteBuffer; |
| import java.time.Instant; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicReference; |
| import java.util.function.Consumer; |
| |
| import org.apache.hc.client5.http.HttpRoute; |
| import org.apache.hc.client5.http.async.AsyncExecCallback; |
| import org.apache.hc.client5.http.async.AsyncExecChain; |
| import org.apache.hc.client5.http.async.AsyncExecChainHandler; |
| import org.apache.hc.client5.http.async.methods.SimpleBody; |
| import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; |
| import org.apache.hc.client5.http.cache.CacheResponseStatus; |
| import org.apache.hc.client5.http.cache.HttpCacheContext; |
| import org.apache.hc.client5.http.cache.HttpCacheEntry; |
| import org.apache.hc.client5.http.cache.RequestCacheControl; |
| import org.apache.hc.client5.http.cache.ResourceIOException; |
| import org.apache.hc.client5.http.cache.ResponseCacheControl; |
| import org.apache.hc.client5.http.impl.ExecSupport; |
| import org.apache.hc.client5.http.protocol.HttpClientContext; |
| import org.apache.hc.client5.http.schedule.SchedulingStrategy; |
| import org.apache.hc.client5.http.validator.ETag; |
| import org.apache.hc.core5.annotation.Contract; |
| import org.apache.hc.core5.annotation.ThreadingBehavior; |
| import org.apache.hc.core5.concurrent.CancellableDependency; |
| import org.apache.hc.core5.concurrent.ComplexFuture; |
| import org.apache.hc.core5.concurrent.FutureCallback; |
| import org.apache.hc.core5.http.ContentType; |
| import org.apache.hc.core5.http.EntityDetails; |
| import org.apache.hc.core5.http.Header; |
| import org.apache.hc.core5.http.HttpException; |
| import org.apache.hc.core5.http.HttpHeaders; |
| import org.apache.hc.core5.http.HttpHost; |
| import org.apache.hc.core5.http.HttpRequest; |
| import org.apache.hc.core5.http.HttpResponse; |
| import org.apache.hc.core5.http.HttpStatus; |
| import org.apache.hc.core5.http.impl.BasicEntityDetails; |
| import org.apache.hc.core5.http.message.RequestLine; |
| import org.apache.hc.core5.http.nio.AsyncDataConsumer; |
| import org.apache.hc.core5.http.nio.AsyncEntityProducer; |
| import org.apache.hc.core5.http.nio.CapacityChannel; |
| import org.apache.hc.core5.http.protocol.HttpCoreContext; |
| import org.apache.hc.core5.http.support.BasicRequestBuilder; |
| import org.apache.hc.core5.net.URIAuthority; |
| import org.apache.hc.core5.util.Args; |
| import org.apache.hc.core5.util.ByteArrayBuffer; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * Request executor in the request execution chain that is responsible for |
| * transparent client-side caching. |
| * <p> |
| * The current implementation is conditionally |
| * compliant with HTTP/1.1 (meaning all the MUST and MUST NOTs are obeyed), |
| * although quite a lot, though not all, of the SHOULDs and SHOULD NOTs |
| * are obeyed too. |
| * |
| * @since 5.0 |
| */ |
| @Contract(threading = ThreadingBehavior.SAFE) // So long as the responseCache implementation is threadsafe |
| class AsyncCachingExec extends CachingExecBase implements AsyncExecChainHandler { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(AsyncCachingExec.class); |
| private final HttpAsyncCache responseCache; |
| private final DefaultAsyncCacheRevalidator cacheRevalidator; |
| private final ConditionalRequestBuilder<HttpRequest> conditionalRequestBuilder; |
| |
| AsyncCachingExec(final HttpAsyncCache cache, final DefaultAsyncCacheRevalidator cacheRevalidator, final CacheConfig config) { |
| super(config); |
| this.responseCache = Args.notNull(cache, "Response cache"); |
| this.cacheRevalidator = cacheRevalidator; |
| this.conditionalRequestBuilder = new ConditionalRequestBuilder<>(request -> |
| BasicRequestBuilder.copy(request).build()); |
| } |
| |
| AsyncCachingExec( |
| final HttpAsyncCache cache, |
| final ScheduledExecutorService executorService, |
| final SchedulingStrategy schedulingStrategy, |
| final CacheConfig config) { |
| this(cache, |
| executorService != null ? new DefaultAsyncCacheRevalidator(executorService, schedulingStrategy) : null, |
| config); |
| } |
| |
| private void triggerResponse( |
| final SimpleHttpResponse cacheResponse, |
| final AsyncExecChain.Scope scope, |
| final AsyncExecCallback asyncExecCallback) { |
| scope.execRuntime.releaseEndpoint(); |
| |
| final SimpleBody body = cacheResponse.getBody(); |
| final byte[] content = body != null ? body.getBodyBytes() : null; |
| final ContentType contentType = body != null ? body.getContentType() : null; |
| try { |
| final AsyncDataConsumer dataConsumer = asyncExecCallback.handleResponse( |
| cacheResponse, |
| content != null ? new BasicEntityDetails(content.length, contentType) : null); |
| if (dataConsumer != null) { |
| if (content != null) { |
| dataConsumer.consume(ByteBuffer.wrap(content)); |
| } |
| dataConsumer.streamEnd(null); |
| } |
| asyncExecCallback.completed(); |
| } catch (final HttpException | IOException ex) { |
| asyncExecCallback.failed(ex); |
| } |
| } |
| |
| static class AsyncExecCallbackWrapper implements AsyncExecCallback { |
| |
| private final Runnable command; |
| private final Consumer<Exception> exceptionConsumer; |
| |
| AsyncExecCallbackWrapper(final Runnable command, final Consumer<Exception> exceptionConsumer) { |
| this.command = command; |
| this.exceptionConsumer = exceptionConsumer; |
| } |
| |
| @Override |
| public AsyncDataConsumer handleResponse( |
| final HttpResponse response, |
| final EntityDetails entityDetails) throws HttpException, IOException { |
| return null; |
| } |
| |
| @Override |
| public void handleInformationResponse(final HttpResponse response) throws HttpException, IOException { |
| } |
| |
| @Override |
| public void completed() { |
| command.run(); |
| } |
| |
| @Override |
| public void failed(final Exception cause) { |
| if (exceptionConsumer != null) { |
| exceptionConsumer.accept(cause); |
| } |
| } |
| |
| } |
| |
| @Override |
| public void execute( |
| final HttpRequest request, |
| final AsyncEntityProducer entityProducer, |
| final AsyncExecChain.Scope scope, |
| final AsyncExecChain chain, |
| final AsyncExecCallback asyncExecCallback) throws HttpException, IOException { |
| Args.notNull(request, "HTTP request"); |
| Args.notNull(scope, "Scope"); |
| |
| final HttpRoute route = scope.route; |
| final HttpClientContext context = scope.clientContext; |
| |
| final URIAuthority authority = request.getAuthority(); |
| final String scheme = request.getScheme(); |
| final HttpHost target = authority != null ? new HttpHost(scheme, authority) : route.getTargetHost(); |
| doExecute(target, |
| request, |
| entityProducer, |
| scope, |
| chain, |
| new AsyncExecCallback() { |
| |
| @Override |
| public AsyncDataConsumer handleResponse( |
| final HttpResponse response, |
| final EntityDetails entityDetails) throws HttpException, IOException { |
| context.setAttribute(HttpCoreContext.HTTP_REQUEST, request); |
| context.setAttribute(HttpCoreContext.HTTP_RESPONSE, response); |
| return asyncExecCallback.handleResponse(response, entityDetails); |
| } |
| |
| @Override |
| public void handleInformationResponse( |
| final HttpResponse response) throws HttpException, IOException { |
| asyncExecCallback.handleInformationResponse(response); |
| } |
| |
| @Override |
| public void completed() { |
| asyncExecCallback.completed(); |
| } |
| |
| @Override |
| public void failed(final Exception cause) { |
| asyncExecCallback.failed(cause); |
| } |
| |
| }); |
| } |
| |
| public void doExecute( |
| final HttpHost target, |
| final HttpRequest request, |
| final AsyncEntityProducer entityProducer, |
| final AsyncExecChain.Scope scope, |
| final AsyncExecChain chain, |
| final AsyncExecCallback asyncExecCallback) throws HttpException, IOException { |
| |
| final String exchangeId = scope.exchangeId; |
| final HttpCacheContext context = HttpCacheContext.adapt(scope.clientContext); |
| final CancellableDependency operation = scope.cancellableDependency; |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("{} request via cache: {}", exchangeId, new RequestLine(request)); |
| } |
| |
| context.setAttribute(HttpCacheContext.CACHE_RESPONSE_STATUS, CacheResponseStatus.CACHE_MISS); |
| context.setAttribute(HttpCacheContext.CACHE_ENTRY, null); |
| |
| if (clientRequestsOurOptions(request)) { |
| context.setAttribute(HttpCacheContext.CACHE_RESPONSE_STATUS, CacheResponseStatus.CACHE_MODULE_RESPONSE); |
| triggerResponse(SimpleHttpResponse.create(HttpStatus.SC_NOT_IMPLEMENTED), scope, asyncExecCallback); |
| return; |
| } |
| |
| final RequestCacheControl requestCacheControl; |
| if (request.containsHeader(HttpHeaders.CACHE_CONTROL)) { |
| requestCacheControl = CacheControlHeaderParser.INSTANCE.parse(request); |
| context.setRequestCacheControl(requestCacheControl); |
| } else { |
| requestCacheControl = context.getRequestCacheControl(); |
| CacheControlHeaderGenerator.INSTANCE.generate(requestCacheControl, request); |
| } |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("{} request cache control: {}", exchangeId, requestCacheControl); |
| } |
| |
| if (cacheableRequestPolicy.canBeServedFromCache(requestCacheControl, request)) { |
| operation.setDependency(responseCache.match(target, request, new FutureCallback<CacheMatch>() { |
| |
| @Override |
| public void completed(final CacheMatch result) { |
| final CacheHit hit = result != null ? result.hit : null; |
| final CacheHit root = result != null ? result.root : null; |
| if (hit == null) { |
| handleCacheMiss(requestCacheControl, root, target, request, entityProducer, scope, chain, asyncExecCallback); |
| } else { |
| final ResponseCacheControl responseCacheControl = CacheControlHeaderParser.INSTANCE.parse(hit.entry); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("{} response cache control: {}", exchangeId, responseCacheControl); |
| } |
| context.setResponseCacheControl(responseCacheControl); |
| handleCacheHit(requestCacheControl, responseCacheControl, hit, target, request, entityProducer, scope, chain, asyncExecCallback); |
| } |
| } |
| |
| @Override |
| public void failed(final Exception cause) { |
| asyncExecCallback.failed(cause); |
| } |
| |
| @Override |
| public void cancelled() { |
| asyncExecCallback.failed(new InterruptedIOException()); |
| } |
| |
| })); |
| |
| } else { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("{} request cannot be served from cache", exchangeId); |
| } |
| callBackend(target, request, entityProducer, scope, chain, asyncExecCallback); |
| } |
| } |
| |
| void chainProceed( |
| final HttpRequest request, |
| final AsyncEntityProducer entityProducer, |
| final AsyncExecChain.Scope scope, |
| final AsyncExecChain chain, |
| final AsyncExecCallback asyncExecCallback) { |
| try { |
| chain.proceed(request, entityProducer, scope, asyncExecCallback); |
| } catch (final HttpException | IOException ex) { |
| asyncExecCallback.failed(ex); |
| } |
| } |
| |
| void callBackend( |
| final HttpHost target, |
| final HttpRequest request, |
| final AsyncEntityProducer entityProducer, |
| final AsyncExecChain.Scope scope, |
| final AsyncExecChain chain, |
| final AsyncExecCallback asyncExecCallback) { |
| final String exchangeId = scope.exchangeId; |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("{} calling the backend", exchangeId); |
| } |
| final Instant requestDate = getCurrentDate(); |
| final AtomicReference<AsyncExecCallback> callbackRef = new AtomicReference<>(); |
| chainProceed(request, entityProducer, scope, chain, new AsyncExecCallback() { |
| |
| @Override |
| public AsyncDataConsumer handleResponse( |
| final HttpResponse backendResponse, |
| final EntityDetails entityDetails) throws HttpException, IOException { |
| final Instant responseDate = getCurrentDate(); |
| final AsyncExecCallback callback = new BackendResponseHandler(target, request, requestDate, responseDate, scope, asyncExecCallback); |
| callbackRef.set(callback); |
| return callback.handleResponse(backendResponse, entityDetails); |
| } |
| |
| @Override |
| public void handleInformationResponse(final HttpResponse response) throws HttpException, IOException { |
| final AsyncExecCallback callback = callbackRef.getAndSet(null); |
| if (callback != null) { |
| callback.handleInformationResponse(response); |
| } else { |
| asyncExecCallback.handleInformationResponse(response); |
| } |
| } |
| |
| @Override |
| public void completed() { |
| final AsyncExecCallback callback = callbackRef.getAndSet(null); |
| if (callback != null) { |
| callback.completed(); |
| } else { |
| asyncExecCallback.completed(); |
| } |
| } |
| |
| @Override |
| public void failed(final Exception cause) { |
| final AsyncExecCallback callback = callbackRef.getAndSet(null); |
| if (callback != null) { |
| callback.failed(cause); |
| } else { |
| asyncExecCallback.failed(cause); |
| } |
| } |
| |
| }); |
| } |
| |
| class CachingAsyncDataConsumer implements AsyncDataConsumer { |
| |
| private final String exchangeId; |
| private final AsyncExecCallback fallback; |
| private final HttpResponse backendResponse; |
| private final EntityDetails entityDetails; |
| private final AtomicBoolean writtenThrough; |
| private final AtomicReference<ByteArrayBuffer> bufferRef; |
| private final AtomicReference<AsyncDataConsumer> dataConsumerRef; |
| |
| CachingAsyncDataConsumer( |
| final String exchangeId, |
| final AsyncExecCallback fallback, |
| final HttpResponse backendResponse, |
| final EntityDetails entityDetails) { |
| this.exchangeId = exchangeId; |
| this.fallback = fallback; |
| this.backendResponse = backendResponse; |
| this.entityDetails = entityDetails; |
| this.writtenThrough = new AtomicBoolean(false); |
| this.bufferRef = new AtomicReference<>(entityDetails != null ? new ByteArrayBuffer(1024) : null); |
| this.dataConsumerRef = new AtomicReference<>(); |
| } |
| |
| @Override |
| public final void updateCapacity(final CapacityChannel capacityChannel) throws IOException { |
| final AsyncDataConsumer dataConsumer = dataConsumerRef.get(); |
| if (dataConsumer != null) { |
| dataConsumer.updateCapacity(capacityChannel); |
| } else { |
| capacityChannel.update(Integer.MAX_VALUE); |
| } |
| } |
| |
| @Override |
| public final void consume(final ByteBuffer src) throws IOException { |
| final ByteArrayBuffer buffer = bufferRef.get(); |
| if (buffer != null) { |
| if (src.hasArray()) { |
| buffer.append(src.array(), src.arrayOffset() + src.position(), src.remaining()); |
| } else { |
| while (src.hasRemaining()) { |
| buffer.append(src.get()); |
| } |
| } |
| if (buffer.length() > cacheConfig.getMaxObjectSize()) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("{} backend response content length exceeds maximum", exchangeId); |
| } |
| // Over the max limit. Stop buffering and forward the response |
| // along with all the data buffered so far to the caller. |
| bufferRef.set(null); |
| try { |
| final AsyncDataConsumer dataConsumer = fallback.handleResponse(backendResponse, entityDetails); |
| if (dataConsumer != null) { |
| dataConsumerRef.set(dataConsumer); |
| writtenThrough.set(true); |
| dataConsumer.consume(ByteBuffer.wrap(buffer.array(), 0, buffer.length())); |
| } |
| } catch (final HttpException ex) { |
| fallback.failed(ex); |
| } |
| } |
| } else { |
| final AsyncDataConsumer dataConsumer = dataConsumerRef.get(); |
| if (dataConsumer != null) { |
| dataConsumer.consume(src); |
| } |
| } |
| } |
| |
| @Override |
| public final void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException { |
| final AsyncDataConsumer dataConsumer = dataConsumerRef.getAndSet(null); |
| if (dataConsumer != null) { |
| dataConsumer.streamEnd(trailers); |
| } |
| } |
| |
| @Override |
| public void releaseResources() { |
| final AsyncDataConsumer dataConsumer = dataConsumerRef.getAndSet(null); |
| if (dataConsumer != null) { |
| dataConsumer.releaseResources(); |
| } |
| } |
| |
| } |
| |
| class BackendResponseHandler implements AsyncExecCallback { |
| |
| private final HttpHost target; |
| private final HttpRequest request; |
| private final Instant requestDate; |
| private final Instant responseDate; |
| private final AsyncExecChain.Scope scope; |
| private final AsyncExecCallback asyncExecCallback; |
| private final AtomicReference<CachingAsyncDataConsumer> cachingConsumerRef; |
| |
| BackendResponseHandler( |
| final HttpHost target, |
| final HttpRequest request, |
| final Instant requestDate, |
| final Instant responseDate, |
| final AsyncExecChain.Scope scope, |
| final AsyncExecCallback asyncExecCallback) { |
| this.target = target; |
| this.request = request; |
| this.requestDate = requestDate; |
| this.responseDate = responseDate; |
| this.scope = scope; |
| this.asyncExecCallback = asyncExecCallback; |
| this.cachingConsumerRef = new AtomicReference<>(); |
| } |
| |
| @Override |
| public AsyncDataConsumer handleResponse( |
| final HttpResponse backendResponse, |
| final EntityDetails entityDetails) throws HttpException, IOException { |
| final String exchangeId = scope.exchangeId; |
| responseCache.evictInvalidatedEntries(target, request, backendResponse, new FutureCallback<Boolean>() { |
| |
| @Override |
| public void completed(final Boolean result) { |
| } |
| |
| @Override |
| public void failed(final Exception ex) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("{} unable to flush invalidated entries from cache", exchangeId, ex); |
| } |
| } |
| |
| @Override |
| public void cancelled() { |
| } |
| |
| }); |
| if (isResponseTooBig(entityDetails)) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("{} backend response is known to be too big", exchangeId); |
| } |
| return asyncExecCallback.handleResponse(backendResponse, entityDetails); |
| } |
| |
| final ResponseCacheControl responseCacheControl = CacheControlHeaderParser.INSTANCE.parse(backendResponse); |
| final boolean cacheable = responseCachingPolicy.isResponseCacheable(responseCacheControl, request, backendResponse); |
| if (cacheable) { |
| storeRequestIfModifiedSinceFor304Response(request, backendResponse); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("{} caching backend response", exchangeId); |
| } |
| final CachingAsyncDataConsumer cachingDataConsumer = new CachingAsyncDataConsumer( |
| exchangeId, asyncExecCallback, backendResponse, entityDetails); |
| cachingConsumerRef.set(cachingDataConsumer); |
| return cachingDataConsumer; |
| } else { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("{} backend response is not cacheable", exchangeId); |
| } |
| return asyncExecCallback.handleResponse(backendResponse, entityDetails); |
| } |
| } |
| |
| @Override |
| public void handleInformationResponse(final HttpResponse response) throws HttpException, IOException { |
| asyncExecCallback.handleInformationResponse(response); |
| } |
| |
| void triggerNewCacheEntryResponse(final HttpResponse backendResponse, final Instant responseDate, final ByteArrayBuffer buffer) { |
| final String exchangeId = scope.exchangeId; |
| final HttpClientContext context = scope.clientContext; |
| final CancellableDependency operation = scope.cancellableDependency; |
| operation.setDependency(responseCache.store( |
| target, |
| request, |
| backendResponse, |
| buffer, |
| requestDate, |
| responseDate, |
| new FutureCallback<CacheHit>() { |
| |
| @Override |
| public void completed(final CacheHit hit) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("{} backend response successfully cached", exchangeId); |
| } |
| try { |
| final SimpleHttpResponse cacheResponse = responseGenerator.generateResponse(request, hit.entry); |
| context.setAttribute(HttpCacheContext.CACHE_ENTRY, hit.entry); |
| triggerResponse(cacheResponse, scope, asyncExecCallback); |
| } catch (final ResourceIOException ex) { |
| asyncExecCallback.failed(ex); |
| } |
| } |
| |
| @Override |
| public void failed(final Exception ex) { |
| asyncExecCallback.failed(ex); |
| } |
| |
| @Override |
| public void cancelled() { |
| asyncExecCallback.failed(new InterruptedIOException()); |
| } |
| |
| })); |
| |
| } |
| |
| void triggerCachedResponse(final HttpCacheEntry entry) { |
| final HttpClientContext context = scope.clientContext; |
| try { |
| final SimpleHttpResponse cacheResponse = responseGenerator.generateResponse(request, entry); |
| context.setAttribute(HttpCacheContext.CACHE_ENTRY, entry); |
| triggerResponse(cacheResponse, scope, asyncExecCallback); |
| } catch (final ResourceIOException ex) { |
| asyncExecCallback.failed(ex); |
| } |
| } |
| |
| @Override |
| public void completed() { |
| final String exchangeId = scope.exchangeId; |
| final CachingAsyncDataConsumer cachingDataConsumer = cachingConsumerRef.getAndSet(null); |
| if (cachingDataConsumer == null || cachingDataConsumer.writtenThrough.get()) { |
| asyncExecCallback.completed(); |
| return; |
| } |
| final HttpResponse backendResponse = cachingDataConsumer.backendResponse; |
| final ByteArrayBuffer buffer = cachingDataConsumer.bufferRef.getAndSet(null); |
| |
| // Handle 304 Not Modified responses |
| if (backendResponse.getCode() == HttpStatus.SC_NOT_MODIFIED) { |
| responseCache.match(target, request, new FutureCallback<CacheMatch>() { |
| |
| @Override |
| public void completed(final CacheMatch result) { |
| final CacheHit hit = result != null ? result.hit : null; |
| if (hit != null) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("{} existing cache entry found, updating cache entry", exchangeId); |
| } |
| responseCache.update( |
| hit, |
| target, |
| request, |
| backendResponse, |
| requestDate, |
| responseDate, |
| new FutureCallback<CacheHit>() { |
| |
| @Override |
| public void completed(final CacheHit updated) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("{} cache entry updated, generating response from updated entry", exchangeId); |
| } |
| triggerCachedResponse(updated.entry); |
| } |
| @Override |
| public void failed(final Exception cause) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("{} request failed: {}", exchangeId, cause.getMessage()); |
| } |
| asyncExecCallback.failed(cause); |
| } |
| |
| @Override |
| public void cancelled() { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("{} cache entry updated aborted", exchangeId); |
| } |
| asyncExecCallback.failed(new InterruptedIOException()); |
| } |
| |
| }); |
| } else { |
| triggerNewCacheEntryResponse(backendResponse, responseDate, buffer); |
| } |
| } |
| |
| @Override |
| public void failed(final Exception cause) { |
| asyncExecCallback.failed(cause); |
| } |
| |
| @Override |
| public void cancelled() { |
| asyncExecCallback.failed(new InterruptedIOException()); |
| } |
| |
| }); |
| } else { |
| if (cacheConfig.isFreshnessCheckEnabled()) { |
| final CancellableDependency operation = scope.cancellableDependency; |
| operation.setDependency(responseCache.match(target, request, new FutureCallback<CacheMatch>() { |
| |
| @Override |
| public void completed(final CacheMatch result) { |
| final CacheHit hit = result != null ? result.hit : null; |
| if (HttpCacheEntry.isNewer(hit != null ? hit.entry : null, backendResponse)) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("{} backend already contains fresher cache entry", exchangeId); |
| } |
| triggerCachedResponse(hit.entry); |
| } else { |
| triggerNewCacheEntryResponse(backendResponse, responseDate, buffer); |
| } |
| } |
| |
| @Override |
| public void failed(final Exception cause) { |
| asyncExecCallback.failed(cause); |
| } |
| |
| @Override |
| public void cancelled() { |
| asyncExecCallback.failed(new InterruptedIOException()); |
| } |
| |
| })); |
| } else { |
| triggerNewCacheEntryResponse(backendResponse, responseDate, buffer); |
| } |
| } |
| } |
| |
| @Override |
| public void failed(final Exception cause) { |
| asyncExecCallback.failed(cause); |
| } |
| |
| } |
| |
| private void handleCacheHit( |
| final RequestCacheControl requestCacheControl, |
| final ResponseCacheControl responseCacheControl, |
| final CacheHit hit, |
| final HttpHost target, |
| final HttpRequest request, |
| final AsyncEntityProducer entityProducer, |
| final AsyncExecChain.Scope scope, |
| final AsyncExecChain chain, |
| final AsyncExecCallback asyncExecCallback) { |
| final HttpClientContext context = scope.clientContext; |
| final String exchangeId = scope.exchangeId; |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("{} cache hit: {}", exchangeId, new RequestLine(request)); |
| } |
| |
| context.setAttribute(HttpCacheContext.CACHE_RESPONSE_STATUS, CacheResponseStatus.CACHE_HIT); |
| cacheHits.getAndIncrement(); |
| |
| final Instant now = getCurrentDate(); |
| |
| final CacheSuitability cacheSuitability = suitabilityChecker.assessSuitability(requestCacheControl, responseCacheControl, request, hit.entry, now); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("{} cache suitability: {}", exchangeId, cacheSuitability); |
| } |
| if (cacheSuitability == CacheSuitability.FRESH || cacheSuitability == CacheSuitability.FRESH_ENOUGH) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("{} cache hit is fresh enough", exchangeId); |
| } |
| try { |
| final SimpleHttpResponse cacheResponse = generateCachedResponse(request, hit.entry, now); |
| context.setAttribute(HttpCacheContext.CACHE_ENTRY, hit.entry); |
| triggerResponse(cacheResponse, scope, asyncExecCallback); |
| } catch (final ResourceIOException ex) { |
| if (requestCacheControl.isOnlyIfCached()) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("{} request marked only-if-cached", exchangeId); |
| } |
| context.setAttribute(HttpCacheContext.CACHE_RESPONSE_STATUS, CacheResponseStatus.CACHE_MODULE_RESPONSE); |
| final SimpleHttpResponse cacheResponse = generateGatewayTimeout(); |
| triggerResponse(cacheResponse, scope, asyncExecCallback); |
| } else { |
| context.setAttribute(HttpCacheContext.CACHE_RESPONSE_STATUS, CacheResponseStatus.FAILURE); |
| try { |
| chain.proceed(request, entityProducer, scope, asyncExecCallback); |
| } catch (final HttpException | IOException ex2) { |
| asyncExecCallback.failed(ex2); |
| } |
| } |
| } |
| } else { |
| if (requestCacheControl.isOnlyIfCached()) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("{} cache entry not is not fresh and only-if-cached requested", exchangeId); |
| } |
| context.setAttribute(HttpCacheContext.CACHE_RESPONSE_STATUS, CacheResponseStatus.CACHE_MODULE_RESPONSE); |
| final SimpleHttpResponse cacheResponse = generateGatewayTimeout(); |
| triggerResponse(cacheResponse, scope, asyncExecCallback); |
| } else if (cacheSuitability == CacheSuitability.MISMATCH) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("{} cache entry does not match the request; calling backend", exchangeId); |
| } |
| callBackend(target, request, entityProducer, scope, chain, asyncExecCallback); |
| } else if (entityProducer != null && !entityProducer.isRepeatable()) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("{} request is not repeatable; calling backend", exchangeId); |
| } |
| callBackend(target, request, entityProducer, scope, chain, asyncExecCallback); |
| } else if (hit.entry.getStatus() == HttpStatus.SC_NOT_MODIFIED && !suitabilityChecker.isConditional(request)) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("{} non-modified cache entry does not match the non-conditional request; calling backend", exchangeId); |
| } |
| callBackend(target, request, entityProducer, scope, chain, asyncExecCallback); |
| } else if (cacheSuitability == CacheSuitability.REVALIDATION_REQUIRED) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("{} revalidation required; revalidating cache entry", exchangeId); |
| } |
| revalidateCacheEntryWithoutFallback(responseCacheControl, hit, target, request, entityProducer, scope, chain, asyncExecCallback); |
| } else if (cacheSuitability == CacheSuitability.STALE_WHILE_REVALIDATED) { |
| if (cacheRevalidator != null) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("{} serving stale with asynchronous revalidation", exchangeId); |
| } |
| try { |
| final String revalidationExchangeId = ExecSupport.getNextExchangeId(); |
| context.setExchangeId(revalidationExchangeId); |
| final AsyncExecChain.Scope fork = new AsyncExecChain.Scope( |
| revalidationExchangeId, |
| scope.route, |
| scope.originalRequest, |
| new ComplexFuture<>(null), |
| HttpClientContext.create(), |
| scope.execRuntime.fork(), |
| scope.scheduler, |
| scope.execCount); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("{} starting asynchronous revalidation exchange {}", exchangeId, revalidationExchangeId); |
| } |
| cacheRevalidator.revalidateCacheEntry( |
| hit.getEntryKey(), |
| asyncExecCallback, |
| c -> revalidateCacheEntry(responseCacheControl, hit, target, request, entityProducer, fork, chain, c)); |
| context.setAttribute(HttpCacheContext.CACHE_RESPONSE_STATUS, CacheResponseStatus.CACHE_MODULE_RESPONSE); |
| final SimpleHttpResponse cacheResponse = responseGenerator.generateResponse(request, hit.entry); |
| context.setAttribute(HttpCacheContext.CACHE_ENTRY, hit.entry); |
| triggerResponse(cacheResponse, scope, asyncExecCallback); |
| } catch (final IOException ex) { |
| asyncExecCallback.failed(ex); |
| } |
| } else { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("{} revalidating stale cache entry (asynchronous revalidation disabled)", exchangeId); |
| } |
| revalidateCacheEntryWithFallback(requestCacheControl, responseCacheControl, hit, target, request, entityProducer, scope, chain, asyncExecCallback); |
| } |
| } else if (cacheSuitability == CacheSuitability.STALE) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("{} revalidating stale cache entry", exchangeId); |
| } |
| revalidateCacheEntryWithFallback(requestCacheControl, responseCacheControl, hit, target, request, entityProducer, scope, chain, asyncExecCallback); |
| } else { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("{} cache entry not usable; calling backend", exchangeId); |
| } |
| callBackend(target, request, entityProducer, scope, chain, asyncExecCallback); |
| } |
| } |
| } |
| |
| void revalidateCacheEntry( |
| final ResponseCacheControl responseCacheControl, |
| final CacheHit hit, |
| final HttpHost target, |
| final HttpRequest request, |
| final AsyncEntityProducer entityProducer, |
| final AsyncExecChain.Scope scope, |
| final AsyncExecChain chain, |
| final AsyncExecCallback asyncExecCallback) { |
| final Instant requestDate = getCurrentDate(); |
| final HttpRequest conditionalRequest = conditionalRequestBuilder.buildConditionalRequest( |
| responseCacheControl, |
| BasicRequestBuilder.copy(request).build(), |
| hit.entry); |
| final HttpClientContext context = scope.clientContext; |
| chainProceed(conditionalRequest, entityProducer, scope, chain, new AsyncExecCallback() { |
| |
| final AtomicReference<AsyncExecCallback> callbackRef = new AtomicReference<>(); |
| |
| void triggerUpdatedCacheEntryResponse(final HttpResponse backendResponse, final Instant responseDate) { |
| final CancellableDependency operation = scope.cancellableDependency; |
| operation.setDependency(responseCache.update( |
| hit, |
| target, |
| request, |
| backendResponse, |
| requestDate, |
| responseDate, |
| new FutureCallback<CacheHit>() { |
| |
| @Override |
| public void completed(final CacheHit updated) { |
| try { |
| final SimpleHttpResponse cacheResponse = generateCachedResponse(request, updated.entry, responseDate); |
| context.setAttribute(HttpCacheContext.CACHE_ENTRY, hit.entry); |
| triggerResponse(cacheResponse, scope, asyncExecCallback); |
| } catch (final ResourceIOException ex) { |
| asyncExecCallback.failed(ex); |
| } |
| } |
| |
| @Override |
| public void failed(final Exception ex) { |
| asyncExecCallback.failed(ex); |
| } |
| |
| @Override |
| public void cancelled() { |
| asyncExecCallback.failed(new InterruptedIOException()); |
| } |
| |
| })); |
| } |
| |
| AsyncExecCallback evaluateResponse(final HttpResponse backendResponse, final Instant responseDate) { |
| final int statusCode = backendResponse.getCode(); |
| if (statusCode == HttpStatus.SC_NOT_MODIFIED || statusCode == HttpStatus.SC_OK) { |
| context.setAttribute(HttpCacheContext.CACHE_RESPONSE_STATUS, CacheResponseStatus.VALIDATED); |
| cacheUpdates.getAndIncrement(); |
| } |
| if (statusCode == HttpStatus.SC_NOT_MODIFIED) { |
| return new AsyncExecCallbackWrapper(() -> triggerUpdatedCacheEntryResponse(backendResponse, responseDate), asyncExecCallback::failed); |
| } |
| return new BackendResponseHandler(target, conditionalRequest, requestDate, responseDate, scope, asyncExecCallback); |
| } |
| |
| @Override |
| public AsyncDataConsumer handleResponse( |
| final HttpResponse backendResponse1, |
| final EntityDetails entityDetails) throws HttpException, IOException { |
| |
| final Instant responseDate = getCurrentDate(); |
| |
| final AsyncExecCallback callback1; |
| if (HttpCacheEntry.isNewer(hit.entry, backendResponse1)) { |
| |
| final HttpRequest unconditional = conditionalRequestBuilder.buildUnconditionalRequest( |
| BasicRequestBuilder.copy(scope.originalRequest).build()); |
| |
| callback1 = new AsyncExecCallbackWrapper(() -> chainProceed(unconditional, entityProducer, scope, chain, new AsyncExecCallback() { |
| |
| @Override |
| public AsyncDataConsumer handleResponse( |
| final HttpResponse backendResponse2, |
| final EntityDetails entityDetails1) throws HttpException, IOException { |
| final Instant responseDate2 = getCurrentDate(); |
| final AsyncExecCallback callback2 = evaluateResponse(backendResponse2, responseDate2); |
| callbackRef.set(callback2); |
| return callback2.handleResponse(backendResponse2, entityDetails1); |
| } |
| |
| @Override |
| public void handleInformationResponse(final HttpResponse response) throws HttpException, IOException { |
| final AsyncExecCallback callback2 = callbackRef.getAndSet(null); |
| if (callback2 != null) { |
| callback2.handleInformationResponse(response); |
| } else { |
| asyncExecCallback.handleInformationResponse(response); |
| } |
| } |
| |
| @Override |
| public void completed() { |
| final AsyncExecCallback callback2 = callbackRef.getAndSet(null); |
| if (callback2 != null) { |
| callback2.completed(); |
| } else { |
| asyncExecCallback.completed(); |
| } |
| } |
| |
| @Override |
| public void failed(final Exception cause) { |
| final AsyncExecCallback callback2 = callbackRef.getAndSet(null); |
| if (callback2 != null) { |
| callback2.failed(cause); |
| } else { |
| asyncExecCallback.failed(cause); |
| } |
| } |
| |
| }), asyncExecCallback::failed); |
| } else { |
| callback1 = evaluateResponse(backendResponse1, responseDate); |
| } |
| callbackRef.set(callback1); |
| return callback1.handleResponse(backendResponse1, entityDetails); |
| } |
| |
| @Override |
| public void handleInformationResponse(final HttpResponse response) throws HttpException, IOException { |
| final AsyncExecCallback callback1 = callbackRef.getAndSet(null); |
| if (callback1 != null) { |
| callback1.handleInformationResponse(response); |
| } else { |
| asyncExecCallback.handleInformationResponse(response); |
| } |
| } |
| |
| @Override |
| public void completed() { |
| final AsyncExecCallback callback1 = callbackRef.getAndSet(null); |
| if (callback1 != null) { |
| callback1.completed(); |
| } else { |
| asyncExecCallback.completed(); |
| } |
| } |
| |
| @Override |
| public void failed(final Exception cause) { |
| final AsyncExecCallback callback1 = callbackRef.getAndSet(null); |
| if (callback1 != null) { |
| callback1.failed(cause); |
| } else { |
| asyncExecCallback.failed(cause); |
| } |
| } |
| |
| }); |
| |
| } |
| |
| void revalidateCacheEntryWithoutFallback( |
| final ResponseCacheControl responseCacheControl, |
| final CacheHit hit, |
| final HttpHost target, |
| final HttpRequest request, |
| final AsyncEntityProducer entityProducer, |
| final AsyncExecChain.Scope scope, |
| final AsyncExecChain chain, |
| final AsyncExecCallback asyncExecCallback) { |
| final String exchangeId = scope.exchangeId; |
| final HttpClientContext context = scope.clientContext; |
| revalidateCacheEntry(responseCacheControl, hit, target, request, entityProducer, scope, chain, new AsyncExecCallback() { |
| |
| private final AtomicBoolean committed = new AtomicBoolean(); |
| |
| @Override |
| public AsyncDataConsumer handleResponse(final HttpResponse response, |
| final EntityDetails entityDetails) throws HttpException, IOException { |
| committed.set(true); |
| return asyncExecCallback.handleResponse(response, entityDetails); |
| } |
| |
| @Override |
| public void handleInformationResponse(final HttpResponse response) throws HttpException, IOException { |
| asyncExecCallback.handleInformationResponse(response); |
| } |
| |
| @Override |
| public void completed() { |
| asyncExecCallback.completed(); |
| } |
| |
| @Override |
| public void failed(final Exception cause) { |
| if (!committed.get() && cause instanceof IOException) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("{} I/O error while revalidating cache entry", exchangeId, cause); |
| } |
| final SimpleHttpResponse cacheResponse = generateGatewayTimeout(); |
| context.setAttribute(HttpCacheContext.CACHE_RESPONSE_STATUS, CacheResponseStatus.CACHE_MODULE_RESPONSE); |
| triggerResponse(cacheResponse, scope, asyncExecCallback); |
| } else { |
| asyncExecCallback.failed(cause); |
| } |
| } |
| |
| }); |
| } |
| |
| void revalidateCacheEntryWithFallback( |
| final RequestCacheControl requestCacheControl, |
| final ResponseCacheControl responseCacheControl, |
| final CacheHit hit, |
| final HttpHost target, |
| final HttpRequest request, |
| final AsyncEntityProducer entityProducer, |
| final AsyncExecChain.Scope scope, |
| final AsyncExecChain chain, |
| final AsyncExecCallback asyncExecCallback) { |
| final String exchangeId = scope.exchangeId; |
| final HttpClientContext context = scope.clientContext; |
| revalidateCacheEntry(responseCacheControl, hit, target, request, entityProducer, scope, chain, new AsyncExecCallback() { |
| |
| private final AtomicReference<HttpResponse> committed = new AtomicReference<>(); |
| |
| @Override |
| public AsyncDataConsumer handleResponse(final HttpResponse response, final EntityDetails entityDetails) throws HttpException, IOException { |
| final int status = response.getCode(); |
| if (staleIfErrorAppliesTo(status) && |
| suitabilityChecker.isSuitableIfError(requestCacheControl, responseCacheControl, hit.entry, getCurrentDate())) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("{} serving stale response due to {} status and stale-if-error enabled", exchangeId, status); |
| } |
| return null; |
| } else { |
| committed.set(response); |
| return asyncExecCallback.handleResponse(response, entityDetails); |
| } |
| } |
| |
| @Override |
| public void handleInformationResponse(final HttpResponse response) throws HttpException, IOException { |
| asyncExecCallback.handleInformationResponse(response); |
| } |
| |
| @Override |
| public void completed() { |
| final HttpResponse response = committed.get(); |
| if (response == null) { |
| try { |
| context.setAttribute(HttpCacheContext.CACHE_RESPONSE_STATUS, CacheResponseStatus.CACHE_MODULE_RESPONSE); |
| final SimpleHttpResponse cacheResponse = responseGenerator.generateResponse(request, hit.entry); |
| context.setAttribute(HttpCacheContext.CACHE_ENTRY, hit.entry); |
| triggerResponse(cacheResponse, scope, asyncExecCallback); |
| } catch (final IOException ex) { |
| asyncExecCallback.failed(ex); |
| } |
| } else { |
| asyncExecCallback.completed(); |
| } |
| } |
| |
| @Override |
| public void failed(final Exception cause) { |
| final HttpResponse response = committed.get(); |
| if (response == null) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("{} I/O error while revalidating cache entry", exchangeId, cause); |
| } |
| context.setAttribute(HttpCacheContext.CACHE_RESPONSE_STATUS, CacheResponseStatus.CACHE_MODULE_RESPONSE); |
| if (cause instanceof IOException && |
| suitabilityChecker.isSuitableIfError(requestCacheControl, responseCacheControl, hit.entry, getCurrentDate())) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("{} serving stale response due to IOException and stale-if-error enabled", exchangeId); |
| } |
| try { |
| final SimpleHttpResponse cacheResponse = responseGenerator.generateResponse(request, hit.entry); |
| context.setAttribute(HttpCacheContext.CACHE_ENTRY, hit.entry); |
| triggerResponse(cacheResponse, scope, asyncExecCallback); |
| } catch (final IOException ex) { |
| asyncExecCallback.failed(cause); |
| } |
| } else { |
| final SimpleHttpResponse cacheResponse = generateGatewayTimeout(); |
| triggerResponse(cacheResponse, scope, asyncExecCallback); |
| } |
| } else { |
| asyncExecCallback.failed(cause); |
| } |
| } |
| |
| }); |
| } |
| private void handleCacheMiss( |
| final RequestCacheControl requestCacheControl, |
| final CacheHit partialMatch, |
| final HttpHost target, |
| final HttpRequest request, |
| final AsyncEntityProducer entityProducer, |
| final AsyncExecChain.Scope scope, |
| final AsyncExecChain chain, |
| final AsyncExecCallback asyncExecCallback) { |
| final String exchangeId = scope.exchangeId; |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("{} cache miss: {}", exchangeId, new RequestLine(request)); |
| } |
| cacheMisses.getAndIncrement(); |
| |
| final CancellableDependency operation = scope.cancellableDependency; |
| if (requestCacheControl.isOnlyIfCached()) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("{} request marked only-if-cached", exchangeId); |
| } |
| final HttpClientContext context = scope.clientContext; |
| context.setAttribute(HttpCacheContext.CACHE_RESPONSE_STATUS, CacheResponseStatus.CACHE_MODULE_RESPONSE); |
| final SimpleHttpResponse cacheResponse = generateGatewayTimeout(); |
| triggerResponse(cacheResponse, scope, asyncExecCallback); |
| } |
| |
| if (partialMatch != null && partialMatch.entry.hasVariants() && entityProducer == null) { |
| operation.setDependency(responseCache.getVariants( |
| partialMatch, |
| new FutureCallback<Collection<CacheHit>>() { |
| |
| @Override |
| public void completed(final Collection<CacheHit> variants) { |
| if (variants != null && !variants.isEmpty()) { |
| negotiateResponseFromVariants(target, request, entityProducer, scope, chain, asyncExecCallback, variants); |
| } else { |
| callBackend(target, request, entityProducer, scope, chain, asyncExecCallback); |
| } |
| } |
| |
| @Override |
| public void failed(final Exception ex) { |
| asyncExecCallback.failed(ex); |
| } |
| |
| @Override |
| public void cancelled() { |
| asyncExecCallback.failed(new InterruptedIOException()); |
| } |
| |
| })); |
| } else { |
| callBackend(target, request, entityProducer, scope, chain, asyncExecCallback); |
| } |
| } |
| |
| void negotiateResponseFromVariants( |
| final HttpHost target, |
| final HttpRequest request, |
| final AsyncEntityProducer entityProducer, |
| final AsyncExecChain.Scope scope, |
| final AsyncExecChain chain, |
| final AsyncExecCallback asyncExecCallback, |
| final Collection<CacheHit> variants) { |
| final String exchangeId = scope.exchangeId; |
| final CancellableDependency operation = scope.cancellableDependency; |
| final Map<ETag, CacheHit> variantMap = new HashMap<>(); |
| for (final CacheHit variant : variants) { |
| final ETag eTag = variant.entry.getETag(); |
| if (eTag != null) { |
| variantMap.put(eTag, variant); |
| } |
| } |
| |
| final HttpRequest conditionalRequest = conditionalRequestBuilder.buildConditionalRequestFromVariants( |
| request, |
| variantMap.keySet()); |
| |
| final Instant requestDate = getCurrentDate(); |
| chainProceed(conditionalRequest, entityProducer, scope, chain, new AsyncExecCallback() { |
| |
| final AtomicReference<AsyncExecCallback> callbackRef = new AtomicReference<>(); |
| |
| void updateVariantCacheEntry(final HttpResponse backendResponse, final Instant responseDate, final CacheHit match) { |
| final HttpClientContext context = scope.clientContext; |
| context.setAttribute(HttpCacheContext.CACHE_RESPONSE_STATUS, CacheResponseStatus.VALIDATED); |
| cacheUpdates.getAndIncrement(); |
| |
| operation.setDependency(responseCache.storeFromNegotiated( |
| match, |
| target, |
| request, |
| backendResponse, |
| requestDate, |
| responseDate, |
| new FutureCallback<CacheHit>() { |
| |
| @Override |
| public void completed(final CacheHit hit) { |
| try { |
| final SimpleHttpResponse cacheResponse = generateCachedResponse(request, hit.entry, responseDate); |
| context.setAttribute(HttpCacheContext.CACHE_ENTRY, hit.entry); |
| triggerResponse(cacheResponse, scope, asyncExecCallback); |
| } catch (final ResourceIOException ex) { |
| asyncExecCallback.failed(ex); |
| } |
| } |
| |
| @Override |
| public void failed(final Exception ex) { |
| asyncExecCallback.failed(ex); |
| } |
| |
| @Override |
| public void cancelled() { |
| asyncExecCallback.failed(new InterruptedIOException()); |
| } |
| |
| })); |
| } |
| |
| @Override |
| public AsyncDataConsumer handleResponse( |
| final HttpResponse backendResponse, |
| final EntityDetails entityDetails) throws HttpException, IOException { |
| final HttpClientContext context = scope.clientContext; |
| final Instant responseDate = getCurrentDate(); |
| final AsyncExecCallback callback; |
| if (backendResponse.getCode() != HttpStatus.SC_NOT_MODIFIED) { |
| callback = new BackendResponseHandler(target, request, requestDate, responseDate, scope, asyncExecCallback); |
| } else { |
| final ETag resultEtag = ETag.get(backendResponse); |
| if (resultEtag == null) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("{} 304 response did not contain ETag", exchangeId); |
| } |
| callback = new AsyncExecCallbackWrapper(() -> callBackend(target, request, entityProducer, scope, chain, asyncExecCallback), asyncExecCallback::failed); |
| } else { |
| final CacheHit match = variantMap.get(resultEtag); |
| if (match == null) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("{} 304 response did not contain ETag matching one sent in If-None-Match", exchangeId); |
| } |
| callback = new AsyncExecCallbackWrapper(() -> callBackend(target, request, entityProducer, scope, chain, asyncExecCallback), asyncExecCallback::failed); |
| } else { |
| if (HttpCacheEntry.isNewer(match.entry, backendResponse)) { |
| final HttpRequest unconditional = conditionalRequestBuilder.buildUnconditionalRequest( |
| BasicRequestBuilder.copy(request).build()); |
| callback = new AsyncExecCallbackWrapper(() -> callBackend(target, unconditional, entityProducer, scope, chain, asyncExecCallback), asyncExecCallback::failed); |
| } else { |
| callback = new AsyncExecCallbackWrapper(() -> updateVariantCacheEntry(backendResponse, responseDate, match), asyncExecCallback::failed); |
| } |
| } |
| } |
| } |
| callbackRef.set(callback); |
| return callback.handleResponse(backendResponse, entityDetails); |
| } |
| |
| @Override |
| public void handleInformationResponse(final HttpResponse response) throws HttpException, IOException { |
| final AsyncExecCallback callback = callbackRef.getAndSet(null); |
| if (callback != null) { |
| callback.handleInformationResponse(response); |
| } else { |
| asyncExecCallback.handleInformationResponse(response); |
| } |
| } |
| |
| @Override |
| public void completed() { |
| final AsyncExecCallback callback = callbackRef.getAndSet(null); |
| if (callback != null) { |
| callback.completed(); |
| } else { |
| asyncExecCallback.completed(); |
| } |
| } |
| |
| @Override |
| public void failed(final Exception cause) { |
| final AsyncExecCallback callback = callbackRef.getAndSet(null); |
| if (callback != null) { |
| callback.failed(cause); |
| } else { |
| asyncExecCallback.failed(cause); |
| } |
| } |
| |
| }); |
| |
| } |
| |
| } |