blob: 0091dcb2d65cece9115fa29658bb676e8509f300 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.knative.http;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.nio.charset.StandardCharsets;
import java.util.Locale;
import java.util.Map;
import java.util.function.Predicate;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.HttpServerResponse;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Message;
import org.apache.camel.NoTypeConversionAvailableException;
import org.apache.camel.Processor;
import org.apache.camel.TypeConverter;
import org.apache.camel.component.knative.spi.KnativeEnvironment;
import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.support.DefaultConsumer;
import org.apache.camel.support.DefaultMessage;
import org.apache.camel.support.ExchangeHelper;
import org.apache.camel.support.MessageHelper;
public class KnativeHttpConsumer extends DefaultConsumer implements KnativeHttp.PredicatedHandler {
private final KnativeHttpTransport transport;
private final Predicate<HttpServerRequest> filter;
private final KnativeHttp.ServerKey key;
private final KnativeEnvironment.KnativeServiceDefinition serviceDefinition;
private final HeaderFilterStrategy headerFilterStrategy;
public KnativeHttpConsumer(
KnativeHttpTransport transport,
Endpoint endpoint,
KnativeEnvironment.KnativeServiceDefinition serviceDefinition,
Processor processor) {
super(endpoint, processor);
this.transport = transport;
this.serviceDefinition = serviceDefinition;
this.headerFilterStrategy = new KnativeHttpHeaderFilterStrategy();
this.key = new KnativeHttp.ServerKey(serviceDefinition.getHost(), serviceDefinition.getPortOrDefault(KnativeHttp.DEFAULT_PORT));
this.filter = KnativeHttpSupport.createFilter(serviceDefinition);
}
@Override
protected void doStart() throws Exception {
this.transport.getDispatcher(key).bind(this);
super.doStart();
}
@Override
protected void doStop() throws Exception {
this.transport.getDispatcher(key).unbind(this);
super.doStop();
}
@Override
public boolean canHandle(HttpServerRequest request) {
return filter.test(request);
}
@Override
public void handle(HttpServerRequest request) {
if (request.method() == HttpMethod.POST) {
final Exchange exchange = getEndpoint().createExchange(ExchangePattern.InOut);
final Message in = toMessage(request, exchange);
request.bodyHandler(buffer -> {
in.setBody(buffer.getBytes());
exchange.setIn(in);
try {
createUoW(exchange);
getAsyncProcessor().process(exchange, doneSync -> {
try {
HttpServerResponse response = toHttpResponse(request, exchange.getMessage());
Buffer body = null;
if (request.response().getStatusCode() != 204) {
body = computeResponseBody(exchange.getMessage());
// set the content type in the response.
String contentType = MessageHelper.getContentType(exchange.getMessage());
if (contentType != null) {
// set content-type
response.putHeader(Exchange.CONTENT_TYPE, contentType);
}
}
if (body != null) {
request.response().end(body);
} else {
request.response().setStatusCode(204);
request.response().end();
}
} catch (Exception e) {
getExceptionHandler().handleException(e);
}
});
} catch (Exception e) {
getExceptionHandler().handleException(e);
} finally {
doneUoW(exchange);
}
});
} else {
request.response().setStatusCode(405);
request.response().putHeader(Exchange.CONTENT_TYPE, "text/plain");
request.response().end("Unsupported method");
throw new IllegalArgumentException("Unsupported method: " + request.method());
}
}
private Message toMessage(HttpServerRequest request, Exchange exchange) {
Message message = new DefaultMessage(exchange.getContext());
String path = request.path();
if (serviceDefinition.getPath() != null) {
String endpointPath = serviceDefinition.getPath();
String matchPath = path.toLowerCase(Locale.US);
String match = endpointPath.toLowerCase(Locale.US);
if (matchPath.startsWith(match)) {
path = path.substring(endpointPath.length());
}
}
for (Map.Entry<String, String> entry : request.headers().entries()) {
if (!headerFilterStrategy.applyFilterToExternalHeaders(entry.getKey(), entry.getValue(), exchange)) {
KnativeHttpSupport.appendHeader(message.getHeaders(), entry.getKey(), entry.getValue());
}
}
for (Map.Entry<String, String> entry : request.params().entries()) {
if (!headerFilterStrategy.applyFilterToExternalHeaders(entry.getKey(), entry.getValue(), exchange)) {
KnativeHttpSupport.appendHeader(message.getHeaders(), entry.getKey(), entry.getValue());
}
}
message.setHeader(Exchange.HTTP_PATH, path);
message.setHeader(Exchange.HTTP_METHOD, request.method());
message.setHeader(Exchange.HTTP_URI, request.uri());
message.setHeader(Exchange.HTTP_QUERY, request.query());
return message;
}
private HttpServerResponse toHttpResponse(HttpServerRequest request, Message message) {
final HttpServerResponse response = request.response();
final boolean failed = message.getExchange().isFailed();
final int defaultCode = failed ? 500 : 200;
final int code = message.getHeader(Exchange.HTTP_RESPONSE_CODE, defaultCode, int.class);
final TypeConverter tc = message.getExchange().getContext().getTypeConverter();
response.setStatusCode(code);
for (Map.Entry<String, Object> entry : message.getHeaders().entrySet()) {
final String key = entry.getKey();
final Object value = entry.getValue();
for (Object it: org.apache.camel.support.ObjectHelper.createIterable(value, null)) {
String headerValue = tc.convertTo(String.class, it);
if (headerValue == null) {
continue;
}
if (!headerFilterStrategy.applyFilterToCamelHeaders(key, headerValue, message.getExchange())) {
response.putHeader(key, headerValue);
}
}
}
return response;
}
private Buffer computeResponseBody(Message message) throws NoTypeConversionAvailableException {
Object body = message.getBody();
Exception exception = message.getExchange().getException();
if (exception != null) {
// we failed due an exception so print it as plain text
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
exception.printStackTrace(pw);
// the body should then be the stacktrace
body = sw.toString().getBytes(StandardCharsets.UTF_8);
// force content type to be text/plain as that is what the stacktrace is
message.setHeader(Exchange.CONTENT_TYPE, "text/plain");
// and mark the exception as failure handled, as we handled it by returning
// it as the response
ExchangeHelper.setFailureHandled(message.getExchange());
}
return body != null
? Buffer.buffer(message.getExchange().getContext().getTypeConverter().mandatoryConvertTo(byte[].class, body))
: null;
}
}