blob: f325a919e82c4df1691dc780242c56e23acaaf28 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.k.http.engine;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import io.vertx.core.MultiMap;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.ext.web.Route;
import io.vertx.ext.web.RoutingContext;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.NoTypeConversionAvailableException;
import org.apache.camel.Processor;
import org.apache.camel.TypeConversionException;
import org.apache.camel.TypeConverter;
import org.apache.camel.component.platform.http.PlatformHttpEndpoint;
import org.apache.camel.component.platform.http.spi.Method;
import org.apache.camel.k.http.PlatformHttp;
import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.support.DefaultConsumer;
import org.apache.camel.support.DefaultMessage;
import org.apache.camel.support.ExchangeHelper;
import org.apache.camel.support.MessageHelper;
import org.apache.camel.support.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RuntimePlatformHttpConsumer extends DefaultConsumer {
private static final Logger LOG = LoggerFactory.getLogger(RuntimePlatformHttpConsumer.class);
private static final Pattern PATH_PARAMETER_PATTERN = Pattern.compile("\\{([^/}]+)\\}");
private Route route;
public RuntimePlatformHttpConsumer(PlatformHttpEndpoint endpoint, Processor processor) {
super(endpoint, processor);
}
@Override
public PlatformHttpEndpoint getEndpoint() {
return (PlatformHttpEndpoint) super.getEndpoint();
}
@Override
protected void doInit() throws Exception {
super.doStart();
final PlatformHttpEndpoint endpoint = getEndpoint();
final PlatformHttp router = PlatformHttp.lookup(endpoint.getCamelContext());
final String path = endpoint.getPath();
final String vertxPathParamPath = PATH_PARAMETER_PATTERN.matcher(path).replaceAll(":$1");
final Route newRoute = router.router().route(vertxPathParamPath);
router.handlers().forEach(newRoute::handler);
final Set<Method> methods = Method.parseList(endpoint.getHttpMethodRestrict());
if (!methods.equals(Method.getAll())) {
methods.stream().forEach(m -> newRoute.method(HttpMethod.valueOf(m.name())));
}
if (endpoint.getConsumes() != null) {
newRoute.consumes(endpoint.getConsumes());
}
if (endpoint.getProduces() != null) {
newRoute.produces(endpoint.getProduces());
}
newRoute.handler(
ctx -> {
Exchange exchg = null;
try {
final Exchange exchange = exchg = toExchange(ctx);
createUoW(exchange);
getAsyncProcessor().process(
exchange,
doneSync -> writeResponse(ctx, exchange, getEndpoint().getHeaderFilterStrategy()));
} catch (Exception e) {
ctx.fail(e);
getExceptionHandler().handleException("Failed handling platform-http endpoint " + path, exchg, e);
} finally {
if (exchg != null) {
doneUoW(exchg);
}
}
});
this.route = newRoute;
}
@Override
protected void doStop() throws Exception {
if (route != null) {
route.remove();
route = null;
}
super.doStop();
}
@Override
protected void doSuspend() throws Exception {
if (route != null) {
route.disable();
}
super.doSuspend();
}
@Override
protected void doResume() throws Exception {
if (route != null) {
route.enable();
}
super.doResume();
}
static Object toHttpResponse(HttpServerResponse response, Message message, HeaderFilterStrategy headerFilterStrategy) {
final Exchange exchange = message.getExchange();
final TypeConverter tc = exchange.getContext().getTypeConverter();
final int code = determineResponseCode(exchange, message.getBody());
response.setStatusCode(code);
// copy headers from Message to Response
if (headerFilterStrategy != null) {
for (Map.Entry<String, Object> entry : message.getHeaders().entrySet()) {
final String key = entry.getKey();
final Object value = entry.getValue();
// use an iterator as there can be multiple values. (must not use a delimiter)
final Iterator<?> it = ObjectHelper.createIterator(value, null);
String firstValue = null;
List<String> values = null;
while (it.hasNext()) {
final String headerValue = tc.convertTo(String.class, it.next());
if (headerValue != null
&& !headerFilterStrategy.applyFilterToCamelHeaders(key, headerValue, exchange)) {
if (firstValue == null) {
firstValue = headerValue;
} else {
if (values == null) {
values = new ArrayList<>();
values.add(firstValue);
}
values.add(headerValue);
}
}
}
if (values != null) {
response.putHeader(key, values);
} else if (firstValue != null) {
response.putHeader(key, firstValue);
}
}
}
Object body = message.getBody();
final Exception exception = exchange.getException();
if (exception != null) {
// we failed due an exception so print it as plain text
final StringWriter sw = new StringWriter();
final PrintWriter pw = new PrintWriter(sw);
exception.printStackTrace(pw);
// the body should then be the stacktrace
body = ByteBuffer.wrap(sw.toString().getBytes(StandardCharsets.UTF_8));
// force content type to be text/plain as that is what the stacktrace is
message.setHeader(Exchange.CONTENT_TYPE, "text/plain; charset=utf-8");
// and mark the exception as failure handled, as we handled it by returning it as the response
ExchangeHelper.setFailureHandled(exchange);
}
// set the content-length if it can be determined, or chunked encoding
final Integer length = determineContentLength(exchange, body);
if (length != null) {
response.putHeader("Content-Length", String.valueOf(length));
} else {
response.setChunked(true);
}
// set the content type in the response.
final String contentType = MessageHelper.getContentType(message);
if (contentType != null) {
// set content-type
response.putHeader("Content-Type", contentType);
}
return body;
}
static Integer determineContentLength(Exchange camelExchange, Object body) {
if (body instanceof byte[]) {
return ((byte[]) body).length;
} else if (body instanceof ByteBuffer) {
return ((ByteBuffer) body).remaining();
}
return null;
}
/*
* Copied from org.apache.camel.http.common.DefaultHttpBinding.determineResponseCode(Exchange, Object)
* If DefaultHttpBinding.determineResponseCode(Exchange, Object) is moved to a module without the servlet-api
* dependency we could eventually consume it from there.
*/
static int determineResponseCode(Exchange camelExchange, Object body) {
boolean failed = camelExchange.isFailed();
int defaultCode = failed ? 500 : 200;
Message message = camelExchange.getMessage();
Integer currentCode = message.getHeader(Exchange.HTTP_RESPONSE_CODE, Integer.class);
int codeToUse = currentCode == null ? defaultCode : currentCode;
if (codeToUse != 500) {
if ((body == null) || (body instanceof String && ((String) body).trim().isEmpty())) {
// no content
codeToUse = currentCode == null ? 204 : currentCode;
}
}
return codeToUse;
}
static void writeResponse(RoutingContext ctx, Exchange camelExchange, HeaderFilterStrategy headerFilterStrategy) {
final Object body = toHttpResponse(ctx.response(), camelExchange.getMessage(), headerFilterStrategy);
final HttpServerResponse response = ctx.response();
if (body == null) {
LOG.trace("No payload to send as reply for exchange: {}", camelExchange);
response.end();
} else if (body instanceof String) {
response.end((String) body);
} else if (body instanceof InputStream) {
final byte[] bytes = new byte[4096];
try (InputStream in = (InputStream) body) {
int len;
while ((len = in.read(bytes)) >= 0) {
response.write(Buffer.buffer(len).appendBytes(bytes, 0, len));
}
} catch (IOException e) {
throw new RuntimeException(e);
}
response.end();
} else {
final TypeConverter tc = camelExchange.getContext().getTypeConverter();
try {
final ByteBuffer bb = tc.mandatoryConvertTo(ByteBuffer.class, body);
response.end(Buffer.buffer(bb.capacity()).setBytes(0, bb));
} catch (TypeConversionException | NoTypeConversionAvailableException e) {
throw new RuntimeException(e);
}
}
}
private Exchange toExchange(RoutingContext ctx) {
final Exchange exchange = getEndpoint().createExchange();
Message in = toCamelMessage(ctx, exchange);
final String charset = ctx.parsedHeaders().contentType().parameter("charset");
if (charset != null) {
exchange.setProperty(Exchange.CHARSET_NAME, charset);
in.setHeader(Exchange.HTTP_CHARACTER_ENCODING, charset);
}
exchange.setIn(in);
return exchange;
}
private Message toCamelMessage(RoutingContext ctx, Exchange exchange) {
final Message result = new DefaultMessage(exchange);
final HeaderFilterStrategy headerFilterStrategy = getEndpoint().getHeaderFilterStrategy();
populateCamelHeaders(ctx, result.getHeaders(), exchange, headerFilterStrategy);
final String mimeType = ctx.parsedHeaders().contentType().value();
final boolean isMultipartFormData = "multipart/form-data".equals(mimeType);
if ("application/x-www-form-urlencoded".equals(mimeType) || isMultipartFormData) {
final MultiMap formData = ctx.request().formAttributes();
final Map<String, Object> body = new HashMap<>();
for (String key : formData.names()) {
for (String value : formData.getAll(key)) {
if (headerFilterStrategy != null
&& !headerFilterStrategy.applyFilterToExternalHeaders(key, value, exchange)) {
appendHeader(result.getHeaders(), key, value);
appendHeader(body, key, value);
}
}
}
result.setBody(body);
if (isMultipartFormData) {
//populateAttachments(ctx.fileUploads(), result);
}
} else {
// extract body by myself if undertow parser didn't handle and the method is allowed to have one
// body is extracted as byte[] then auto TypeConverter kicks in
Method m = Method.valueOf(ctx.request().method().name());
if (m.canHaveBody()) {
final Buffer body = ctx.getBody();
if (body != null) {
result.setBody(body.getBytes());
}
} else {
result.setBody(null);
}
}
return result;
}
static void populateCamelHeaders(
RoutingContext ctx,
Map<String, Object> headersMap,
Exchange exchange,
HeaderFilterStrategy headerFilterStrategy) {
final HttpServerRequest request = ctx.request();
headersMap.put(Exchange.HTTP_PATH, request.path());
if (headerFilterStrategy != null) {
final MultiMap requestHeaders = request.headers();
final String authz = requestHeaders.get("authorization");
// store a special header that this request was authenticated using HTTP Basic
if (authz != null && authz.trim().startsWith("Basic")) {
if (!headerFilterStrategy.applyFilterToExternalHeaders(Exchange.AUTHENTICATION, "Basic", exchange)) {
appendHeader(headersMap, Exchange.AUTHENTICATION, "Basic");
}
}
for (String name : requestHeaders.names()) {
// add the headers one by one, and use the header filter strategy
for (String value : requestHeaders.getAll(name)) {
if (!headerFilterStrategy.applyFilterToExternalHeaders(name.toString(), value, exchange)) {
appendHeader(headersMap, name.toString(), value);
}
}
}
// process uri parameters as headers
final MultiMap pathParameters = ctx.queryParams();
// continue if the map is not empty, otherwise there are no params
if (!pathParameters.isEmpty()) {
for (String name : pathParameters.names()) {
for (String value : pathParameters.getAll(name)) {
if (!headerFilterStrategy.applyFilterToExternalHeaders(name, value, exchange)) {
appendHeader(headersMap, name, value);
}
}
}
}
}
// Path parameters
ctx.pathParams().forEach((k, v) -> appendHeader(headersMap, k, v));
// NOTE: these headers is applied using the same logic as camel-http/camel-jetty to be consistent
headersMap.put(Exchange.HTTP_METHOD, request.method().toString());
// strip query parameters from the uri
headersMap.put(Exchange.HTTP_URL, request.absoluteURI());
// uri is without the host and port
headersMap.put(Exchange.HTTP_URI, request.uri());
headersMap.put(Exchange.HTTP_QUERY, request.query());
headersMap.put(Exchange.HTTP_RAW_QUERY, request.query());
}
@SuppressWarnings("unchecked")
static void appendHeader(Map<String, Object> headers, String key, Object value) {
if (headers.containsKey(key)) {
Object existing = headers.get(key);
List<Object> list;
if (existing instanceof List) {
list = (List<Object>) existing;
} else {
list = new ArrayList<>();
list.add(existing);
}
list.add(value);
value = list;
}
headers.put(key, value);
}
}