| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.camel.component.ahc; |
| |
| import java.io.ByteArrayInputStream; |
| import java.io.ByteArrayOutputStream; |
| import java.io.File; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.Serializable; |
| import java.io.UnsupportedEncodingException; |
| import java.util.LinkedHashMap; |
| import java.util.List; |
| import java.util.Map; |
| |
| import com.ning.http.client.BodyGenerator; |
| import com.ning.http.client.HttpResponseHeaders; |
| import com.ning.http.client.HttpResponseStatus; |
| import com.ning.http.client.Request; |
| import com.ning.http.client.RequestBuilder; |
| import com.ning.http.client.generators.ByteArrayBodyGenerator; |
| import com.ning.http.client.generators.FileBodyGenerator; |
| import com.ning.http.client.generators.InputStreamBodyGenerator; |
| import org.apache.camel.CamelExchangeException; |
| import org.apache.camel.Exchange; |
| import org.apache.camel.Message; |
| import org.apache.camel.component.ahc.helper.AhcHelper; |
| import org.apache.camel.component.file.GenericFile; |
| import org.apache.camel.converter.IOConverter; |
| import org.apache.camel.spi.HeaderFilterStrategy; |
| import org.apache.camel.util.ExchangeHelper; |
| import org.apache.camel.util.GZIPHelper; |
| import org.apache.camel.util.IOHelper; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| public class DefaultAhcBinding implements AhcBinding { |
| |
| protected final transient Logger log = LoggerFactory.getLogger(this.getClass()); |
| |
| @Override |
| public Request prepareRequest(AhcEndpoint endpoint, Exchange exchange) throws CamelExchangeException { |
| if (endpoint.isBridgeEndpoint()) { |
| exchange.setProperty(Exchange.SKIP_GZIP_ENCODING, Boolean.TRUE); |
| } |
| |
| RequestBuilder builder = new RequestBuilder(); |
| try { |
| String url = AhcHelper.createURL(exchange, endpoint); |
| log.trace("Setting url {}", url); |
| builder.setUrl(url); |
| } catch (Exception e) { |
| throw new CamelExchangeException("Error creating URL", exchange, e); |
| } |
| String method = extractMethod(exchange); |
| log.trace("Setting method {}", method); |
| builder.setMethod(method); |
| |
| populateHeaders(builder, endpoint, exchange); |
| populateBody(builder, endpoint, exchange); |
| |
| return builder.build(); |
| } |
| |
| protected String extractMethod(Exchange exchange) { |
| // prefer method from header |
| String method = exchange.getIn().getHeader(Exchange.HTTP_METHOD, String.class); |
| if (method != null) { |
| return method; |
| } |
| |
| // if there is a body then do a POST otherwise a GET |
| boolean hasBody = exchange.getIn().getBody() != null; |
| return hasBody ? "POST" : "GET"; |
| } |
| |
| protected void populateHeaders(RequestBuilder builder, AhcEndpoint endpoint, Exchange exchange) { |
| HeaderFilterStrategy strategy = endpoint.getHeaderFilterStrategy(); |
| |
| // propagate headers as HTTP headers |
| for (Map.Entry<String, Object> entry : exchange.getIn().getHeaders().entrySet()) { |
| String headerValue = exchange.getIn().getHeader(entry.getKey(), String.class); |
| if (strategy != null && !strategy.applyFilterToCamelHeaders(entry.getKey(), headerValue, exchange)) { |
| log.trace("Adding header {} = {}", entry.getKey(), headerValue); |
| builder.addHeader(entry.getKey(), headerValue); |
| } |
| } |
| } |
| |
| protected void populateBody(RequestBuilder builder, AhcEndpoint endpoint, Exchange exchange) throws CamelExchangeException { |
| Message in = exchange.getIn(); |
| if (in.getBody() == null) { |
| return; |
| } |
| |
| String contentType = ExchangeHelper.getContentType(exchange); |
| BodyGenerator body = in.getBody(BodyGenerator.class); |
| String charset = IOHelper.getCharsetName(exchange, false); |
| |
| if (body == null) { |
| try { |
| Object data = in.getBody(); |
| if (data != null) { |
| if (contentType != null && AhcConstants.CONTENT_TYPE_JAVA_SERIALIZED_OBJECT.equals(contentType)) { |
| // serialized java object |
| Serializable obj = in.getMandatoryBody(Serializable.class); |
| // write object to output stream |
| ByteArrayOutputStream bos = new ByteArrayOutputStream(); |
| AhcHelper.writeObjectToStream(bos, obj); |
| byte[] bytes = bos.toByteArray(); |
| body = new ByteArrayBodyGenerator(bytes); |
| IOHelper.close(bos); |
| } else if (data instanceof File || data instanceof GenericFile) { |
| // file based (could potentially also be a FTP file etc) |
| File file = in.getBody(File.class); |
| if (file != null) { |
| body = new FileBodyGenerator(file); |
| } |
| } else if (data instanceof String) { |
| // be a bit careful with String as any type can most likely be converted to String |
| // so we only do an instanceof check and accept String if the body is really a String |
| // do not fallback to use the default charset as it can influence the request |
| // (for example application/x-www-form-urlencoded forms being sent) |
| if (charset != null) { |
| body = new ByteArrayBodyGenerator(((String) data).getBytes(charset)); |
| } else { |
| body = new ByteArrayBodyGenerator(((String) data).getBytes()); |
| } |
| } |
| // fallback as input stream |
| if (body == null) { |
| // force the body as an input stream since this is the fallback |
| InputStream is = in.getMandatoryBody(InputStream.class); |
| body = new InputStreamBodyGenerator(is); |
| } |
| } |
| } catch (UnsupportedEncodingException e) { |
| throw new CamelExchangeException("Error creating BodyGenerator from message body", exchange, e); |
| } catch (IOException e) { |
| throw new CamelExchangeException("Error serializing message body", exchange, e); |
| } |
| } |
| |
| if (body != null) { |
| log.trace("Setting body {}", body); |
| builder.setBody(body); |
| } |
| if (charset != null) { |
| log.trace("Setting body charset {}", charset); |
| builder.setBodyEncoding(charset); |
| } |
| // must set content type, even if its null, otherwise it may default to |
| // application/x-www-form-urlencoded which may not be your intention |
| log.trace("Setting Content-Type {}", contentType); |
| builder.setHeader(Exchange.CONTENT_TYPE, contentType); |
| } |
| |
| @Override |
| public void onThrowable(AhcEndpoint endpoint, Exchange exchange, Throwable t) throws Exception { |
| exchange.setException(t); |
| } |
| |
| @Override |
| public void onStatusReceived(AhcEndpoint endpoint, Exchange exchange, HttpResponseStatus responseStatus) throws Exception { |
| exchange.getOut().setHeader(Exchange.HTTP_RESPONSE_CODE, responseStatus.getStatusCode()); |
| } |
| |
| @Override |
| public void onHeadersReceived(AhcEndpoint endpoint, Exchange exchange, HttpResponseHeaders headers) throws Exception { |
| for (Map.Entry<String, List<String>> entry : headers.getHeaders().entrySet()) { |
| String key = entry.getKey(); |
| List<String> value = entry.getValue(); |
| if (value.size() == 1) { |
| exchange.getOut().getHeaders().put(key, value.get(0)); |
| } else { |
| exchange.getOut().getHeaders().put(key, value); |
| } |
| } |
| } |
| |
| @Override |
| public void onComplete(AhcEndpoint endpoint, Exchange exchange, String url, ByteArrayOutputStream os, int contentLength, |
| int statusCode, String statusText) throws Exception { |
| // copy from output stream to input stream |
| os.flush(); |
| os.close(); |
| InputStream is = new ByteArrayInputStream(os.toByteArray()); |
| |
| String contentEncoding = exchange.getOut().getHeader(Exchange.CONTENT_ENCODING, String.class); |
| if (!exchange.getProperty(Exchange.SKIP_GZIP_ENCODING, Boolean.FALSE, Boolean.class)) { |
| is = GZIPHelper.uncompressGzip(contentEncoding, is); |
| } |
| |
| // Honor the character encoding |
| String contentType = exchange.getOut().getHeader(Exchange.CONTENT_TYPE, String.class); |
| if (contentType != null) { |
| // find the charset and set it to the Exchange |
| AhcHelper.setCharsetFromContentType(contentType, exchange); |
| } |
| |
| Object body = is; |
| // if content type is a serialized java object then de-serialize it back to a Java object |
| if (contentType != null && contentType.equals(AhcConstants.CONTENT_TYPE_JAVA_SERIALIZED_OBJECT)) { |
| body = AhcHelper.deserializeJavaObjectFromStream(is); |
| } |
| |
| if (!endpoint.isThrowExceptionOnFailure()) { |
| // if we do not use failed exception then populate response for all response codes |
| populateResponse(exchange, body, contentLength, statusCode); |
| } else { |
| if (statusCode >= 100 && statusCode < 300) { |
| // only populate response for OK response |
| populateResponse(exchange, body, contentLength, statusCode); |
| } else { |
| // operation failed so populate exception to throw |
| throw populateHttpOperationFailedException(endpoint, exchange, url, body, contentLength, statusCode, statusText); |
| } |
| } |
| } |
| |
| private Exception populateHttpOperationFailedException(AhcEndpoint endpoint, Exchange exchange, String url, |
| Object body, int contentLength, |
| int statusCode, String statusText) { |
| Exception answer; |
| |
| if (endpoint.isTransferException() && body != null && body instanceof Exception) { |
| // if the response was a serialized exception then use that |
| return (Exception) body; |
| } |
| |
| // make a defensive copy of the response body in the exception so its detached from the cache |
| String copy = null; |
| if (body != null) { |
| copy = exchange.getContext().getTypeConverter().convertTo(String.class, exchange, body); |
| } |
| |
| Map<String, String> headers = extractResponseHeaders(exchange); |
| |
| if (statusCode >= 300 && statusCode < 400) { |
| String redirectLocation = exchange.getOut().getHeader("Location", String.class); |
| if (redirectLocation != null) { |
| answer = new AhcOperationFailedException(url, statusCode, statusText, redirectLocation, headers, copy); |
| } else { |
| // no redirect location |
| answer = new AhcOperationFailedException(url, statusCode, statusText, null, headers, copy); |
| } |
| } else { |
| // internal server error (error code 500) |
| answer = new AhcOperationFailedException(url, statusCode, statusText, null, headers, copy); |
| } |
| |
| return answer; |
| } |
| |
| private Map<String, String> extractResponseHeaders(Exchange exchange) { |
| Map<String, String> answer = new LinkedHashMap<String, String>(); |
| for (Map.Entry<String, Object> entry : exchange.getOut().getHeaders().entrySet()) { |
| String key = entry.getKey(); |
| String value = exchange.getContext().getTypeConverter().convertTo(String.class, entry.getValue()); |
| if (value != null) { |
| answer.put(key, value); |
| } |
| } |
| return answer; |
| } |
| |
| private void populateResponse(Exchange exchange, Object body, int contentLength, int responseCode) { |
| exchange.getOut().setBody(body); |
| exchange.getOut().setHeader(Exchange.CONTENT_LENGTH, contentLength); |
| } |
| } |