blob: 924f47bed43e20761321215bc2f6617219e68ab6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.olingo.client.core.communication.request;
import java.io.IOException;
import java.net.URI;
import java.util.Objects;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.http.Header;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.client.utils.HttpClientUtils;
import org.apache.http.entity.InputStreamEntity;
import org.apache.http.impl.client.DecompressingHttpClient;
import org.apache.http.util.EntityUtils;
import org.apache.olingo.client.api.ODataClient;
import org.apache.olingo.client.api.communication.ODataClientErrorException;
import org.apache.olingo.client.api.communication.header.ODataPreferences;
import org.apache.olingo.client.api.communication.request.AsyncRequestWrapper;
import org.apache.olingo.client.api.communication.request.ODataRequest;
import org.apache.olingo.client.api.communication.request.cud.ODataDeleteRequest;
import org.apache.olingo.client.api.communication.response.AsyncResponseWrapper;
import org.apache.olingo.client.api.communication.response.ODataDeleteResponse;
import org.apache.olingo.client.api.communication.response.ODataResponse;
import org.apache.olingo.client.api.http.HttpClientException;
import org.apache.olingo.commons.api.http.HttpHeader;
import org.apache.olingo.commons.api.http.HttpMethod;
import org.apache.olingo.commons.api.http.HttpStatusCode;
public class AsyncRequestWrapperImpl<R extends ODataResponse> extends AbstractRequest
implements AsyncRequestWrapper<R> {
protected static final int MAX_RETRY = 5;
protected final ODataClient odataClient;
/**
* Request to be wrapped.
*/
protected final ODataRequest odataRequest;
/**
* HTTP client.
*/
protected final HttpClient httpClient;
/**
* HTTP request.
*/
protected final HttpUriRequest request;
/**
* Target URI.
*/
protected final URI uri;
protected AsyncRequestWrapperImpl(final ODataClient odataClient, final ODataRequest odataRequest) {
this.odataRequest = odataRequest;
this.odataRequest.setAccept(this.odataRequest.getAccept());
this.odataRequest.setContentType(this.odataRequest.getContentType());
extendHeader(HttpHeader.PREFER, new ODataPreferences().respondAsync());
this.odataClient = odataClient;
final HttpMethod method = odataRequest.getMethod();
// target uri
this.uri = odataRequest.getURI();
Objects.requireNonNull(this.uri, "Target URI can't be null");
HttpClient _httpClient = odataClient.getConfiguration().getHttpClientFactory().create(method, this.uri);
if (odataClient.getConfiguration().isGzipCompression()) {
_httpClient = new DecompressingHttpClient(_httpClient);
}
this.httpClient = _httpClient;
this.request = odataClient.getConfiguration().getHttpUriRequestFactory().create(method, this.uri);
if (request instanceof HttpEntityEnclosingRequestBase && odataRequest instanceof AbstractODataBasicRequest) {
AbstractODataBasicRequest<?> br = (AbstractODataBasicRequest<?>) odataRequest;
HttpEntityEnclosingRequestBase httpRequest = ((HttpEntityEnclosingRequestBase) request);
httpRequest.setEntity(new InputStreamEntity(br.getPayload(), -1));
}
}
@Override
public final AsyncRequestWrapper<R> wait(final int waitInSeconds) {
extendHeader(HttpHeader.PREFER, new ODataPreferences().wait(waitInSeconds));
return this;
}
@Override
public final AsyncRequestWrapper<R> callback(URI url) {
extendHeader(HttpHeader.PREFER, new ODataPreferences().callback(url.toASCIIString()));
return this;
}
protected final void extendHeader(final String headerName, final String headerValue) {
final StringBuilder extended = new StringBuilder();
if (this.odataRequest.getHeaderNames().contains(headerName)) {
extended.append(this.odataRequest.getHeader(headerName)).append(", ");
}
this.odataRequest.addCustomHeader(headerName, extended.append(headerValue).toString());
}
@Override
public AsyncResponseWrapper<R> execute() {
return new AsyncResponseWrapperImpl(doExecute());
}
protected HttpResponse doExecute() {
// Add all available headers
for (String key : odataRequest.getHeaderNames()) {
final String value = odataRequest.getHeader(key);
this.request.addHeader(key, value);
LOG.debug("HTTP header being sent {}: {}", key, value);
}
return executeHttpRequest(httpClient, this.request);
}
private URI checkLocation(URI uri) {
if (!this.uri.getScheme().equals(uri.getScheme())) {
throw new AsyncRequestException("Unexpected scheme in the Location header");
}
if (!this.uri.getHost().equals(uri.getHost())) {
throw new AsyncRequestException("Unexpected host name in the Location header");
}
if (this.uri.getPort() != uri.getPort()) {
throw new AsyncRequestException("Unexpected port in the Location header");
}
return uri;
}
public class AsyncResponseWrapperImpl implements AsyncResponseWrapper<R> {
static final int DEFAULT_RETRY_AFTER = 5;
static final int MAX_RETRY_AFTER = 10;
protected URI location = null;
protected R response = null;
protected int retryAfter = DEFAULT_RETRY_AFTER;
protected boolean preferenceApplied = false;
public AsyncResponseWrapperImpl() {}
/**
* Constructor.
*
* @param res HTTP response.
*/
@SuppressWarnings("unchecked")
public AsyncResponseWrapperImpl(final HttpResponse res) {
if (res.getStatusLine().getStatusCode() == 202) {
retrieveMonitorDetails(res);
} else {
response = (R) ((AbstractODataRequest) odataRequest).getResponseTemplate().initFromHttpResponse(res);
}
}
@Override
public boolean isPreferenceApplied() {
return preferenceApplied;
}
@Override
public boolean isDone() {
if (response == null) {
// check to the monitor URL
final HttpResponse res = checkMonitor(location);
if (res.getStatusLine().getStatusCode() == 202) {
retrieveMonitorDetails(res);
} else {
response = instantiateResponse(res);
}
}
return response != null;
}
@Override
public R getODataResponse() {
HttpResponse res = null;
for (int i = 0; response == null && i < MAX_RETRY; i++) {
res = checkMonitor(location);
if (res.getStatusLine().getStatusCode() == HttpStatusCode.ACCEPTED.getStatusCode()) {
final Header[] headers = res.getHeaders(HttpHeader.RETRY_AFTER);
if (ArrayUtils.isNotEmpty(headers)) {
this.retryAfter = parseReplyAfter(headers[0].getValue());
}
try {
// wait for retry-after
Thread.sleep((long) retryAfter * 1000);
} catch (InterruptedException ignore) {
// ignore
}
} else {
location = null;
return instantiateResponse(res);
}
}
if (response == null) {
throw new ODataClientErrorException(res == null ? null : res.getStatusLine());
}
return response;
}
URI createLocation(String string) {
return checkLocation(URI.create(string));
}
int parseReplyAfter(String value) {
if (value == null || value.isEmpty()) {
return DEFAULT_RETRY_AFTER;
}
try {
int n = Integer.parseInt(value);
if (n < 0) {
return DEFAULT_RETRY_AFTER;
}
return Math.min(n, MAX_RETRY_AFTER);
} catch (NumberFormatException e) {
return DEFAULT_RETRY_AFTER;
}
}
@Override
public ODataDeleteResponse delete() {
final ODataDeleteRequest deleteRequest = odataClient.getCUDRequestFactory().getDeleteRequest(location);
return deleteRequest.execute();
}
@Override
public AsyncResponseWrapper<ODataDeleteResponse> asyncDelete() {
return odataClient.getAsyncRequestFactory().<ODataDeleteResponse> getAsyncRequestWrapper(
odataClient.getCUDRequestFactory().getDeleteRequest(location)).execute();
}
@Override
public AsyncResponseWrapper<R> forceNextMonitorCheck(final URI uri) {
this.location = uri;
this.response = null;
return this;
}
@SuppressWarnings("unchecked")
private R instantiateResponse(final HttpResponse res) {
R odataResponse;
try {
odataResponse = (R) ((AbstractODataRequest) odataRequest).getResponseTemplate().initFromEnclosedPart(res
.getEntity().getContent());
} catch (Exception e) {
LOG.error("Error instantiating odata response", e);
odataResponse = null;
} finally {
HttpClientUtils.closeQuietly(res);
}
return odataResponse;
}
private void retrieveMonitorDetails(final HttpResponse res) {
Header[] headers = res.getHeaders(HttpHeader.LOCATION);
if (ArrayUtils.isNotEmpty(headers)) {
this.location = createLocation(headers[0].getValue());
} else {
throw new AsyncRequestException(
"Invalid async request response. Monitor URL '" + headers[0].getValue() + "'");
}
headers = res.getHeaders(HttpHeader.RETRY_AFTER);
if (ArrayUtils.isNotEmpty(headers)) {
this.retryAfter = parseReplyAfter(headers[0].getValue());
}
headers = res.getHeaders(HttpHeader.PREFERENCE_APPLIED);
if (ArrayUtils.isNotEmpty(headers)) {
for (Header header : headers) {
if (header.getValue().equalsIgnoreCase(new ODataPreferences().respondAsync())) {
preferenceApplied = true;
}
}
}
try {
EntityUtils.consume(res.getEntity());
} catch (IOException ex) {
Logger.getLogger(AsyncRequestWrapperImpl.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
protected final HttpResponse checkMonitor(final URI location) {
if (location == null) {
throw new AsyncRequestException("Invalid async request response. Missing monitor URL");
}
final HttpUriRequest monitor = odataClient.getConfiguration().getHttpUriRequestFactory().create(HttpMethod.GET,
location);
return executeHttpRequest(httpClient, monitor);
}
protected final HttpResponse executeHttpRequest(final HttpClient client, final HttpUriRequest req) {
final HttpResponse response;
try {
response = client.execute(req);
} catch (IOException e) {
throw new HttpClientException(e);
} catch (RuntimeException e) {
req.abort();
throw new HttpClientException(e);
}
checkResponse(odataClient, response, odataRequest.getAccept());
return response;
}
}