blob: f70b4f3d2a085087a53072c4c67305b3681eaae8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.undertow;
import java.io.IOException;
import java.io.InputStream;
import java.io.Reader;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.net.ssl.SSLContext;
import io.undertow.client.ClientRequest;
import io.undertow.client.UndertowClient;
import io.undertow.protocols.ssl.UndertowXnioSsl;
import io.undertow.server.DefaultByteBufferPool;
import io.undertow.util.HeaderMap;
import io.undertow.util.Headers;
import io.undertow.util.HttpString;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.TypeConverter;
import org.apache.camel.component.undertow.handlers.CamelWebSocketHandler;
import org.apache.camel.http.common.cookie.CookieHandler;
import org.apache.camel.support.DefaultAsyncProducer;
import org.apache.camel.util.URISupport;
import org.xnio.OptionMap;
import org.xnio.Xnio;
import org.xnio.XnioWorker;
import org.xnio.ssl.XnioSsl;
/**
* The Undertow producer.
*
* The implementation of Producer is considered as experimental. The Undertow
* client classes are not thread safe, their purpose is for the reverse proxy
* usage inside Undertow itself. This may change in the future versions and
* general purpose HTTP client wrapper will be added. Therefore this Producer
* may be changed too.
*/
public class UndertowProducer extends DefaultAsyncProducer {
private UndertowClient client;
private final UndertowEndpoint endpoint;
private final OptionMap options;
private DefaultByteBufferPool pool;
private XnioSsl ssl;
private XnioWorker worker;
private CamelWebSocketHandler webSocketHandler;
public UndertowProducer(final UndertowEndpoint endpoint, final OptionMap options) {
super(endpoint);
this.endpoint = endpoint;
this.options = options;
}
@Override
public UndertowEndpoint getEndpoint() {
return endpoint;
}
boolean isSendToAll(Message in) {
// header may be null; have to be careful here (and fallback to use sendToAll option configured from endpoint)
Boolean value = in.getHeader(UndertowConstants.SEND_TO_ALL, endpoint.getSendToAll(), Boolean.class);
return value == null ? false : value;
}
@Override
public boolean process(final Exchange camelExchange, final AsyncCallback callback) {
if (endpoint.isWebSocket()) {
return processWebSocket(camelExchange, callback);
}
/* not a WebSocket */
final URI uri;
final HttpString method;
try {
final String exchangeUri = UndertowHelper.createURL(camelExchange, getEndpoint());
uri = UndertowHelper.createURI(camelExchange, exchangeUri, getEndpoint());
method = UndertowHelper.createMethod(camelExchange, endpoint, camelExchange.getIn().getBody() != null);
} catch (final URISyntaxException e) {
camelExchange.setException(e);
callback.done(true);
return true;
}
final String pathAndQuery = URISupport.pathAndQueryOf(uri);
final UndertowHttpBinding undertowHttpBinding = endpoint.getUndertowHttpBinding();
final CookieHandler cookieHandler = endpoint.getCookieHandler();
final Map<String, List<String>> cookieHeaders;
if (cookieHandler != null) {
try {
cookieHeaders = cookieHandler.loadCookies(camelExchange, uri);
} catch (final IOException e) {
camelExchange.setException(e);
callback.done(true);
return true;
}
} else {
cookieHeaders = Collections.emptyMap();
}
final ClientRequest request = new ClientRequest();
request.setMethod(method);
request.setPath(pathAndQuery);
final HeaderMap requestHeaders = request.getRequestHeaders();
// Set the Host header
final Message message = camelExchange.getIn();
final String host = message.getHeader(Headers.HOST_STRING, String.class);
if (endpoint.isPreserveHostHeader()) {
requestHeaders.put(Headers.HOST, Optional.ofNullable(host).orElseGet(uri::getAuthority));
} else {
requestHeaders.put(Headers.HOST, uri.getAuthority());
}
cookieHeaders.forEach((key, values) -> {
requestHeaders.putAll(HttpString.tryFromString(key), values);
});
final Object body = undertowHttpBinding.toHttpRequest(request, camelExchange.getIn());
final UndertowClientCallback clientCallback;
final boolean streaming = getEndpoint().isUseStreaming();
if (streaming && (body instanceof InputStream)) {
// For streaming, make it chunked encoding instead of specifying content length
requestHeaders.put(Headers.TRANSFER_ENCODING, "chunked");
clientCallback = new UndertowStreamingClientCallback(camelExchange, callback, getEndpoint(),
request, (InputStream) body);
} else {
final TypeConverter tc = endpoint.getCamelContext().getTypeConverter();
final ByteBuffer bodyAsByte = tc.tryConvertTo(ByteBuffer.class, body);
// As tryConvertTo is used to convert the body, we should do null check
// or the call bodyAsByte.remaining() may throw an NPE
if (body != null && bodyAsByte != null) {
requestHeaders.put(Headers.CONTENT_LENGTH, bodyAsByte.remaining());
}
if (streaming) {
// response may receive streaming
clientCallback = new UndertowStreamingClientCallback(camelExchange, callback, getEndpoint(),
request, bodyAsByte);
} else {
clientCallback = new UndertowClientCallback(camelExchange, callback, getEndpoint(),
request, bodyAsByte);
}
}
if (log.isDebugEnabled()) {
log.debug("Executing http {} method: {}", method, pathAndQuery);
}
// when connect succeeds or fails UndertowClientCallback will
// get notified on a I/O thread run by Xnio worker. The writing
// of request and reading of response is performed also in the
// callback
client.connect(clientCallback, uri, worker, ssl, pool, options);
// the call above will proceed on Xnio I/O thread we will
// notify the exchange asynchronously when the HTTP exchange
// ends with success or failure from UndertowClientCallback
return false;
}
private boolean processWebSocket(final Exchange camelExchange, final AsyncCallback camelCallback) {
final Message in = camelExchange.getIn();
try {
Object message = in.getBody();
if (!(message instanceof String || message instanceof byte[] || message instanceof Reader
|| message instanceof InputStream)) {
message = in.getBody(String.class);
}
if (message != null) {
final int timeout = endpoint.getSendTimeout();
if (isSendToAll(in)) {
return webSocketHandler.send(peer -> true, message, timeout, camelExchange, camelCallback);
}
final List<String> connectionKeys = in.getHeader(UndertowConstants.CONNECTION_KEY_LIST, List.class);
if (connectionKeys != null) {
return webSocketHandler.send(
peer -> connectionKeys.contains(peer.getAttribute(UndertowConstants.CONNECTION_KEY)), message,
timeout, camelExchange, camelCallback);
}
final String connectionKey = in.getHeader(UndertowConstants.CONNECTION_KEY, String.class);
if (connectionKey != null) {
return webSocketHandler.send(
peer -> connectionKey.equals(peer.getAttribute(UndertowConstants.CONNECTION_KEY)), message,
timeout, camelExchange, camelCallback);
}
throw new IllegalStateException(
String.format("Cannot process message which has none of the headers %s, %s or %s set: %s",
UndertowConstants.SEND_TO_ALL, UndertowConstants.CONNECTION_KEY_LIST,
UndertowConstants.CONNECTION_KEY, in));
} else {
/* nothing to do for a null body */
camelCallback.done(true);
return true;
}
} catch (Exception e) {
camelExchange.setException(e);
camelCallback.done(true);
return true;
}
}
@Override
protected void doStart() throws Exception {
super.doStart();
// as in Undertow tests
pool = new DefaultByteBufferPool(true, 17 * 1024);
final Xnio xnio = Xnio.getInstance();
worker = xnio.createWorker(options);
final SSLContext sslContext = getEndpoint().getSslContext();
if (sslContext != null) {
ssl = new UndertowXnioSsl(xnio, options, sslContext);
}
client = UndertowClient.getInstance();
if (endpoint.isWebSocket()) {
this.webSocketHandler = (CamelWebSocketHandler) endpoint.getComponent().registerEndpoint(endpoint.getHttpHandlerRegistrationInfo(), endpoint.getSslContext(), new CamelWebSocketHandler());
}
log.debug("Created worker: {} with options: {}", worker, options);
}
@Override
protected void doStop() throws Exception {
super.doStop();
if (endpoint.isWebSocket()) {
endpoint.getComponent().unregisterEndpoint(endpoint.getHttpHandlerRegistrationInfo(), endpoint.getSslContext());
}
if (worker != null && !worker.isShutdown()) {
log.debug("Shutting down worker: {}", worker);
worker.shutdown();
}
}
}