blob: 792363d5f2cd009d91bc680622a912521beb5378 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.knative.http;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
import java.util.Locale;
import java.util.Map;
import java.util.function.Predicate;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.ext.web.Route;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.BodyHandler;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.NoTypeConversionAvailableException;
import org.apache.camel.Processor;
import org.apache.camel.TypeConverter;
import org.apache.camel.component.knative.spi.KnativeResource;
import org.apache.camel.component.knative.spi.KnativeTransportConfiguration;
import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.support.DefaultConsumer;
import org.apache.camel.support.ExchangeHelper;
import org.apache.camel.support.MessageHelper;
import org.apache.camel.util.IOHelper;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class KnativeHttpConsumer extends DefaultConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(KnativeHttpConsumer.class);
private final KnativeTransportConfiguration configuration;
private final Predicate<HttpServerRequest> filter;
private final KnativeResource resource;
private final Router router;
private final HeaderFilterStrategy headerFilterStrategy;
private String basePath;
private Route route;
private BigInteger maxBodySize;
private boolean preallocateBodyBuffer;
public KnativeHttpConsumer(
KnativeTransportConfiguration configuration,
Endpoint endpoint,
KnativeResource resource,
Router router,
Processor processor) {
super(endpoint, processor);
this.configuration = configuration;
this.resource = resource;
this.router = router;
this.headerFilterStrategy = new KnativeHttpHeaderFilterStrategy();
this.filter = KnativeHttpSupport.createFilter(this.configuration.getCloudEvent(), resource);
this.preallocateBodyBuffer = true;
}
public String getBasePath() {
return basePath;
}
public void setBasePath(String basePath) {
this.basePath = basePath;
}
public BigInteger getMaxBodySize() {
return maxBodySize;
}
public void setMaxBodySize(BigInteger maxBodySize) {
this.maxBodySize = maxBodySize;
}
public boolean isPreallocateBodyBuffer() {
return preallocateBodyBuffer;
}
public void setPreallocateBodyBuffer(boolean preallocateBodyBuffer) {
this.preallocateBodyBuffer = preallocateBodyBuffer;
}
@Override
protected void doStart() throws Exception {
if (route == null) {
String path = resource.getPath();
if (ObjectHelper.isEmpty(path)) {
path = "/";
}
if (ObjectHelper.isNotEmpty(basePath)) {
path = basePath + path;
}
LOGGER.debug("Creating route for path: {}", path);
route = router.route(
HttpMethod.POST,
path
);
BodyHandler bodyHandler = BodyHandler.create();
bodyHandler.setPreallocateBodyBuffer(this.preallocateBodyBuffer);
if (this.maxBodySize != null) {
bodyHandler.setBodyLimit(this.maxBodySize.longValueExact());
}
// add body handler
route.handler(new Handler<RoutingContext>() {
@Override
public void handle(RoutingContext event) {
event.request().resume();
bodyHandler.handle(event);
}
});
// add knative handler
route.handler(routingContext -> {
LOGGER.debug("Handling {}", routingContext);
if (filter.test(routingContext.request())) {
handleRequest(routingContext);
} else {
LOGGER.debug("Cannot handle request on {}, next", getEndpoint().getEndpointUri());
routingContext.next();
}
});
}
super.doStart();
}
@Override
protected void doStop() throws Exception {
if (route != null) {
route.remove();
}
super.doStop();
}
@Override
protected void doSuspend() throws Exception {
if (route != null) {
route.disable();
}
}
@Override
protected void doResume() throws Exception {
if (route != null) {
route.enable();
}
}
private void handleRequest(RoutingContext routingContext) {
final HttpServerRequest request = routingContext.request();
final Exchange exchange = getEndpoint().createExchange();
final Message message = toMessage(request, exchange);
Buffer payload = routingContext.getBody();
if (payload != null) {
message.setBody(payload.getBytes());
} else {
message.setBody(null);
}
// We do not know if any of the processing logic of the route is synchronous or not so we
// need to process the request on a thread on the Vert.x worker pool.
//
// As example the following route may block the Vert.x event loop as the camel-http component
// is not async so if the service is scaled-down, the it may take a while to become ready and
// the camel-http component blocks until the service becomes available.
//
// from("knative:event/my.event")
// .to("http://{{env:PROJECT}}.{{env:NAMESPACE}}.svc.cluster.local/service");
//
routingContext.vertx().executeBlocking(
promise -> {
try {
createUoW(exchange);
} catch (Exception e) {
promise.fail(e);
return;
}
getAsyncProcessor().process(exchange, c -> {
if (!exchange.isFailed()) {
promise.complete();
} else {
promise.fail(exchange.getException());
}
});
},
false,
result -> {
try {
Throwable failure = null;
if (result.succeeded()) {
try {
HttpServerResponse response = toHttpResponse(request, exchange.getMessage());
Buffer body = null;
if (request.response().getStatusCode() != 204 && configuration.isReply()) {
body = computeResponseBody(exchange.getMessage());
String contentType = MessageHelper.getContentType(exchange.getMessage());
if (contentType != null) {
response.putHeader(Exchange.CONTENT_TYPE, contentType);
}
}
if (body != null) {
request.response().end(body);
} else {
request.response().setStatusCode(204);
request.response().end();
}
} catch (Exception e) {
failure = e;
}
} else if (result.failed()) {
failure = result.cause();
}
if (failure != null) {
getExceptionHandler().handleException(failure);
routingContext.fail(failure);
}
} finally {
doneUoW(exchange);
}
});
}
private Message toMessage(HttpServerRequest request, Exchange exchange) {
Message message = exchange.getMessage();
String path = request.path();
if (resource.getPath() != null) {
String endpointPath = resource.getPath();
String matchPath = path.toLowerCase(Locale.US);
String match = endpointPath.toLowerCase(Locale.US);
if (matchPath.startsWith(match)) {
path = path.substring(endpointPath.length());
}
}
for (Map.Entry<String, String> entry : request.headers().entries()) {
if (!headerFilterStrategy.applyFilterToExternalHeaders(entry.getKey(), entry.getValue(), exchange)) {
KnativeHttpSupport.appendHeader(message.getHeaders(), entry.getKey(), entry.getValue());
}
}
for (Map.Entry<String, String> entry : request.params().entries()) {
if (!headerFilterStrategy.applyFilterToExternalHeaders(entry.getKey(), entry.getValue(), exchange)) {
KnativeHttpSupport.appendHeader(message.getHeaders(), entry.getKey(), entry.getValue());
}
}
message.setHeader(Exchange.HTTP_PATH, path);
message.setHeader(Exchange.HTTP_METHOD, request.method());
message.setHeader(Exchange.HTTP_URI, request.uri());
message.setHeader(Exchange.HTTP_QUERY, request.query());
return message;
}
private HttpServerResponse toHttpResponse(HttpServerRequest request, Message message) {
final HttpServerResponse response = request.response();
final boolean failed = message.getExchange().isFailed();
final int defaultCode = failed ? 500 : 200;
final int code = message.getHeader(Exchange.HTTP_RESPONSE_CODE, defaultCode, int.class);
final TypeConverter tc = message.getExchange().getContext().getTypeConverter();
response.setStatusCode(code);
if (configuration.isReply()) {
for (Map.Entry<String, Object> entry : message.getHeaders().entrySet()) {
final String key = entry.getKey();
final Object value = entry.getValue();
for (Object it : org.apache.camel.support.ObjectHelper.createIterable(value, null)) {
String headerValue = tc.convertTo(String.class, it);
if (headerValue == null) {
continue;
}
if (!headerFilterStrategy.applyFilterToCamelHeaders(key, headerValue, message.getExchange())) {
response.putHeader(key, headerValue);
}
}
}
KnativeHttpSupport.remapCloudEventHeaders(configuration.getCloudEvent(), message);
if (configuration.isRemoveCloudEventHeadersInReply()) {
KnativeHttpSupport.removeCloudEventHeaders(configuration.getCloudEvent(), message);
}
}
return response;
}
private Buffer computeResponseBody(Message message) throws NoTypeConversionAvailableException {
Object body = message.getBody();
Exception exception = message.getExchange().getException();
if (exception != null) {
// we failed due an exception so print it as plain text
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
try {
exception.printStackTrace(pw);
// the body should then be the stacktrace
body = sw.toString().getBytes(StandardCharsets.UTF_8);
// force content type to be text/plain as that is what the stacktrace is
message.setHeader(Exchange.CONTENT_TYPE, "text/plain");
// and mark the exception as failure handled, as we handled it by returning
// it as the response
ExchangeHelper.setFailureHandled(message.getExchange());
} finally {
IOHelper.close(pw, sw);
}
}
return body != null
? Buffer.buffer(message.getExchange().getContext().getTypeConverter().mandatoryConvertTo(byte[].class, body))
: null;
}
}