blob: 10f7186463ebae65f5c501ffaa3116b0e37eca90 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.jetty;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.io.Serializable;
import java.net.URI;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.http.common.HttpConstants;
import org.apache.camel.http.common.HttpHelper;
import org.apache.camel.impl.DefaultAsyncProducer;
import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.util.ExchangeHelper;
import org.apache.camel.util.IOHelper;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.URISupport;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.util.component.LifeCycle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @version
*/
public class JettyHttpProducer extends DefaultAsyncProducer implements AsyncProcessor {
private static final Logger LOG = LoggerFactory.getLogger(JettyHttpProducer.class);
private HttpClient client;
private boolean sharedClient;
private JettyHttpBinding binding;
/**
* Creates this producer.
* <p/>
* A client must be set before use, eg either {@link #setClient(org.eclipse.jetty.client.HttpClient)}
* or {@link #setSharedClient(org.eclipse.jetty.client.HttpClient)}.
*
* @param endpoint the endpoint
*/
public JettyHttpProducer(Endpoint endpoint) {
super(endpoint);
}
/**
* Creates this producer
*
* @param endpoint the endpoint
* @param client the non-shared client to use
*/
public JettyHttpProducer(Endpoint endpoint, HttpClient client) {
super(endpoint);
setClient(client);
}
@Override
public JettyHttpEndpoint getEndpoint() {
return (JettyHttpEndpoint) super.getEndpoint();
}
public boolean process(Exchange exchange, final AsyncCallback callback) {
try {
processInternal(exchange, callback);
} catch (Exception e) {
// error occurred before we had a chance to go async
// so set exception and invoke callback true
exchange.setException(e);
callback.done(true);
return true;
}
// we should continue processing this asynchronously
return false;
}
private void processInternal(Exchange exchange, AsyncCallback callback) throws Exception {
// creating the url to use takes 2-steps
String url = HttpHelper.createURL(exchange, getEndpoint());
URI uri = HttpHelper.createURI(exchange, url, getEndpoint());
// get the url from the uri
url = uri.toASCIIString();
// execute any custom url rewrite
String rewriteUrl = HttpHelper.urlRewrite(exchange, url, getEndpoint(), this);
if (rewriteUrl != null) {
// update url and query string from the rewritten url
url = rewriteUrl;
}
String methodName = HttpHelper.createMethod(exchange, getEndpoint(), exchange.getIn().getBody() != null).name();
JettyContentExchange httpExchange = getEndpoint().createContentExchange();
httpExchange.init(exchange, getBinding(), client, callback);
httpExchange.setURL(url); // Url has to be set first
httpExchange.setMethod(methodName);
if (getEndpoint().getHttpClientParameters() != null) {
// For jetty 9 these parameters can not be set on the client
// so we need to set them on the httpExchange
String timeout = (String)getEndpoint().getHttpClientParameters().get("timeout");
if (timeout != null) {
httpExchange.setTimeout(new Long(timeout));
}
String supportRedirect = (String)getEndpoint().getHttpClientParameters().get("supportRedirect");
if (supportRedirect != null) {
httpExchange.setSupportRedirect(Boolean.valueOf(supportRedirect));
}
}
LOG.trace("Using URL: {} with method: {}", url, methodName);
// if there is a body to send as data
if (exchange.getIn().getBody() != null) {
String contentType = ExchangeHelper.getContentType(exchange);
if (contentType != null) {
httpExchange.setRequestContentType(contentType);
}
if (contentType != null && HttpConstants.CONTENT_TYPE_JAVA_SERIALIZED_OBJECT.equals(contentType)) {
if (getEndpoint().getComponent().isAllowJavaSerializedObject() || getEndpoint().isTransferException()) {
// serialized java object
Serializable obj = exchange.getIn().getMandatoryBody(Serializable.class);
// write object to output stream
ByteArrayOutputStream bos = new ByteArrayOutputStream();
try {
HttpHelper.writeObjectToStream(bos, obj);
httpExchange.setRequestContent(bos.toByteArray());
} finally {
IOHelper.close(bos, "body", LOG);
}
} else {
throw new RuntimeCamelException("Content-type " + HttpConstants.CONTENT_TYPE_JAVA_SERIALIZED_OBJECT + " is not allowed");
}
} else {
Object body = exchange.getIn().getBody();
if (body instanceof String) {
String data = (String) body;
// be a bit careful with String as any type can most likely be converted to String
// so we only do an instanceof check and accept String if the body is really a String
// do not fallback to use the default charset as it can influence the request
// (for example application/x-www-form-urlencoded forms being sent)
String charset = IOHelper.getCharsetName(exchange, false);
httpExchange.setRequestContent(data, charset);
} else {
// then fallback to input stream
InputStream is = exchange.getContext().getTypeConverter().mandatoryConvertTo(InputStream.class, exchange, exchange.getIn().getBody());
httpExchange.setRequestContent(is);
// setup the content length if it is possible
String length = exchange.getIn().getHeader(Exchange.CONTENT_LENGTH, String.class);
if (ObjectHelper.isNotEmpty(length)) {
httpExchange.addRequestHeader(Exchange.CONTENT_LENGTH, length);
}
}
}
}
// if we bridge endpoint then we need to skip matching headers with the HTTP_QUERY to avoid sending
// duplicated headers to the receiver, so use this skipRequestHeaders as the list of headers to skip
Map<String, Object> skipRequestHeaders = null;
if (getEndpoint().isBridgeEndpoint()) {
exchange.setProperty(Exchange.SKIP_GZIP_ENCODING, Boolean.TRUE);
String queryString = exchange.getIn().getHeader(Exchange.HTTP_QUERY, String.class);
if (queryString != null) {
skipRequestHeaders = URISupport.parseQuery(queryString, false, true);
}
// Need to remove the Host key as it should be not used
exchange.getIn().getHeaders().remove("host");
}
// propagate headers as HTTP headers
Message in = exchange.getIn();
HeaderFilterStrategy strategy = getEndpoint().getHeaderFilterStrategy();
for (Map.Entry<String, Object> entry : in.getHeaders().entrySet()) {
String key = entry.getKey();
Object headerValue = in.getHeader(key);
if (headerValue != null) {
// use an iterator as there can be multiple values. (must not use a delimiter, and allow empty values)
final Iterator<?> it = ObjectHelper.createIterator(headerValue, null, true);
// the values to add as a request header
final List<String> values = new ArrayList<String>();
// if its a multi value then check each value if we can add it and for multi values they
// should be combined into a single value
while (it.hasNext()) {
String value = exchange.getContext().getTypeConverter().convertTo(String.class, it.next());
// we should not add headers for the parameters in the uri if we bridge the endpoint
// as then we would duplicate headers on both the endpoint uri, and in HTTP headers as well
if (skipRequestHeaders != null && skipRequestHeaders.containsKey(key)) {
continue;
}
if (value != null && strategy != null && !strategy.applyFilterToCamelHeaders(key, value, exchange)) {
values.add(value);
}
}
// add the value(s) as a http request header
if (values.size() > 0) {
// use the default toString of a ArrayList to create in the form [xxx, yyy]
// if multi valued, for a single value, then just output the value as is
String s = values.size() > 1 ? values.toString() : values.get(0);
httpExchange.addRequestHeader(key, s);
}
}
}
// set the callback, which will handle all the response logic
if (LOG.isDebugEnabled()) {
LOG.debug("Sending HTTP request to: {}", httpExchange.getUrl());
}
httpExchange.send(client);
}
public JettyHttpBinding getBinding() {
return binding;
}
public void setBinding(JettyHttpBinding binding) {
this.binding = binding;
}
public HttpClient getClient() {
return client;
}
public void setClient(HttpClient client) {
this.client = client;
this.sharedClient = false;
}
public HttpClient getSharedClient() {
if (sharedClient) {
return client;
} else {
return null;
}
}
public void setSharedClient(HttpClient sharedClient) {
this.client = sharedClient;
this.sharedClient = true;
}
private Object getClientThreadPool() {
try {
return client.getClass().getMethod("getThreadPool").invoke(client);
} catch (Throwable t) {
// not found in Jetty 9 which is OK as the threadpool is auto started on Jetty 9
}
return null;
}
@Override
protected void doStart() throws Exception {
// only start non-shared client
if (!sharedClient && client != null) {
client.start();
// start the thread pool
Object tp = getClientThreadPool();
if (tp instanceof LifeCycle) {
LOG.debug("Starting client thread pool {}", tp);
((LifeCycle) tp).start();
}
}
super.doStart();
}
@Override
protected void doStop() throws Exception {
super.doStop();
// only stop non-shared client
if (!sharedClient && client != null) {
client.stop();
// stop thread pool
Object tp = getClientThreadPool();
if (tp instanceof LifeCycle) {
LOG.debug("Stopping client thread pool {}", tp);
((LifeCycle) tp).stop();
}
}
}
}