blob: 02a07c3888da8b1fc9fb570496ccffe946d02799 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.sidecar.client;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.NoSuchFileException;
import java.util.AbstractMap;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.vertx.core.Future;
import io.vertx.core.MultiMap;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.file.AsyncFile;
import io.vertx.core.file.FileSystem;
import io.vertx.core.file.OpenOptions;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.net.KeyCertOptions;
import io.vertx.core.net.KeyStoreOptions;
import io.vertx.core.net.OpenSSLEngineOptions;
import io.vertx.core.net.TrustOptions;
import io.vertx.ext.web.client.HttpRequest;
import io.vertx.ext.web.client.WebClient;
import io.vertx.ext.web.client.WebClientOptions;
import io.vertx.ext.web.client.predicate.ResponsePredicateResult;
import io.vertx.ext.web.codec.BodyCodec;
import org.apache.cassandra.sidecar.client.request.Request;
import org.apache.cassandra.sidecar.client.request.UploadableRequest;
/**
* An {@link HttpClient} implementation that uses vertx's WebClient internally
*/
public class VertxHttpClient implements HttpClient
{
private static final Logger LOGGER = LoggerFactory.getLogger(VertxHttpClient.class);
protected final Vertx vertx;
protected final WebClient webClient;
protected final HttpClientConfig config;
public VertxHttpClient(Vertx vertx, HttpClientConfig config)
{
WebClientOptions options = new WebClientOptions()
.setMaxPoolSize(config.maxPoolSize())
.setIdleTimeout(config.idleTimeoutMillis())
.setIdleTimeoutUnit(TimeUnit.MILLISECONDS)
.setMaxChunkSize(config.maxChunkSize())
.setReceiveBufferSize(config.receiveBufferSize())
.setConnectTimeout((int) config.timeoutMillis())
.setUserAgent(config.userAgent());
options = applySSLOptions(options, config);
this.vertx = vertx;
this.webClient = WebClient.create(vertx, options);
this.config = config;
}
public VertxHttpClient(Vertx vertx, WebClient webClient, HttpClientConfig config)
{
this.vertx = vertx;
this.webClient = webClient;
this.config = config;
}
Vertx vertx()
{
return vertx;
}
/**
* {@inheritDoc}
*/
@Override
public HttpClientConfig config()
{
return config;
}
/**
* {@inheritDoc}
*/
@Override
public CompletableFuture<HttpResponse> execute(SidecarInstance sidecarInstance, RequestContext context)
{
if (context.request() instanceof UploadableRequest)
{
HttpRequest<Buffer> vertxRequest = vertxRequest(sidecarInstance, context);
UploadableRequest uploadableRequest = (UploadableRequest) context.request();
LOGGER.debug("Uploading file={}, for request={}, instance={}",
uploadableRequest.filename(), context.request(), sidecarInstance);
return executeUploadFileInternal(sidecarInstance, vertxRequest, uploadableRequest.filename());
}
else
{
LOGGER.debug("Executing request={}, on instance={}", context.request(), sidecarInstance);
return executeInternal(sidecarInstance, context);
}
}
protected CompletableFuture<HttpResponse> executeInternal(SidecarInstance sidecarInstance, RequestContext context)
{
HttpRequest<Buffer> vertxRequest = vertxRequest(sidecarInstance, context);
Promise<HttpResponse> promise = Promise.promise();
vertxRequest.ssl(config.ssl())
.timeout(config.timeoutMillis())
.send()
.onFailure(promise::fail)
.onSuccess(response -> {
byte[] raw = response.body() != null ? response.body().getBytes() : null;
promise.complete(new HttpResponseImpl(response.statusCode(),
response.statusMessage(),
raw,
mapHeaders(response.headers()),
sidecarInstance
));
});
return promise.future().toCompletionStage().toCompletableFuture();
}
protected CompletableFuture<HttpResponse> executeUploadFileInternal(SidecarInstance sidecarInstance,
HttpRequest<Buffer> vertxRequest,
String filename)
{
Promise<HttpResponse> promise = Promise.promise();
// open the local file
openFileForRead(vertx.fileSystem(), filename)
.compose(pair -> vertxRequest.ssl(config.ssl())
.putHeader(HttpHeaderNames.CONTENT_LENGTH.toString(),
String.valueOf(pair.getKey()))
.sendStream(pair.getValue()
.setReadBufferSize(config.sendReadBufferSize())))
.onFailure(promise::fail)
.onSuccess(response -> {
byte[] raw = response.body() != null ? response.body().getBytes() : null;
promise.complete(new HttpResponseImpl(response.statusCode(),
response.statusMessage(),
raw,
mapHeaders(response.headers()),
sidecarInstance
));
});
return promise.future().toCompletionStage().toCompletableFuture();
}
/**
* {@inheritDoc}
*/
@Override
public CompletableFuture<HttpResponse> stream(SidecarInstance sidecarInstance,
RequestContext context,
StreamConsumer streamConsumer)
{
Objects.requireNonNull(streamConsumer, "The streamConsumer must be set");
HttpRequest<Buffer> vertxRequest = vertxRequest(sidecarInstance, context);
LOGGER.debug("Streaming request={}, from instance={}", context.request(), sidecarInstance);
Promise<HttpResponse> promise = Promise.promise();
vertxRequest.ssl(config.ssl())
.timeout(config.timeoutMillis())
.expect(response -> {
// fulfill the promise with the response
promise.complete(new HttpResponseImpl(response.statusCode(),
response.statusMessage(),
mapHeaders(response.headers()),
sidecarInstance));
if (response.statusCode() == HttpResponseStatus.OK.code() ||
response.statusCode() == HttpResponseStatus.PARTIAL_CONTENT.code())
{
return ResponsePredicateResult.success();
}
else
{
LOGGER.warn("Unexpected status code received statusCode={}, statusMessage={}",
response.statusCode(), response.statusMessage());
return ResponsePredicateResult.failure("Unexpected status code: " +
response.statusCode());
}
})
.as(BodyCodec.pipe(new StreamConsumerWriteStream(streamConsumer)))
.send()
.onFailure(throwable -> {
if (!promise.tryFail(throwable))
{
// the stream has already started, we need to signal the consumer that the
// there was a failure mid-stream. This is a non-retryable case
streamConsumer.onError(throwable);
}
});
return promise.future().toCompletionStage().toCompletableFuture();
}
/**
* {@inheritDoc}
*/
@Override
public void close()
{
webClient.close();
}
protected HttpRequest<Buffer> vertxRequest(SidecarInstance sidecarInstance, RequestContext context)
{
Request request = context.request();
HttpMethod method = HttpMethod.valueOf(request.method().name());
HttpRequest<Buffer> vertxRequest = webClient.request(method,
sidecarInstance.port(),
sidecarInstance.hostname(),
request.requestURI());
vertxRequest = applyHeaders(vertxRequest, request.headers());
Map<String, String> customHeaders = context.customHeaders();
if (customHeaders != null && !customHeaders.isEmpty())
{
vertxRequest = applyHeaders(vertxRequest, customHeaders);
}
return vertxRequest;
}
protected HttpRequest<Buffer> applyHeaders(HttpRequest<Buffer> vertxRequest, Map<String, String> headers)
{
if (headers == null || headers.isEmpty())
return vertxRequest;
for (Map.Entry<String, String> header : headers.entrySet())
{
vertxRequest = vertxRequest.putHeader(header.getKey(), header.getValue());
}
return vertxRequest;
}
protected Map<String, List<String>> mapHeaders(MultiMap headers)
{
if (headers == null)
{
return Collections.emptyMap();
}
return headers.entries().stream()
.filter(entry -> entry.getKey() != null && entry.getValue() != null)
.collect(Collectors.toMap(Map.Entry::getKey,
entry -> Collections.singletonList(entry.getValue())));
}
protected static WebClientOptions applySSLOptions(WebClientOptions options, HttpClientConfig config)
{
if (!config.ssl())
{
return options;
}
options = options.setSsl(true);
if (config.trustStoreInputStream() != null && config.trustStorePassword() != null)
{
TrustOptions trustOptions = buildKeyCertOptions(config.trustStoreInputStream(),
config.trustStorePassword(),
config.trustStoreType());
options = options.setTrustOptions(trustOptions);
}
if (config.keyStoreInputStream() != null && config.keyStorePassword() != null)
{
KeyCertOptions keyCertOptions = buildKeyCertOptions(config.keyStoreInputStream(),
config.keyStorePassword(),
config.keyStoreType());
options = options.setKeyCertOptions(keyCertOptions);
}
if (OpenSSLEngineOptions.isAvailable())
{
LOGGER.info("Building Sidecar vertx client with OpenSSL");
options = options.setOpenSslEngineOptions(new OpenSSLEngineOptions());
}
else
{
LOGGER.warn("OpenSSL not available when building Sidecar vertx client");
}
return options;
}
protected static KeyStoreOptions buildKeyCertOptions(InputStream storeStream, String storePass, String storeType)
{
try (InputStream inputStream = storeStream)
{
byte[] trustBytes = readStore(inputStream);
return new KeyStoreOptions().setType(storeType)
.setPassword(storePass)
.setValue(Buffer.buffer(trustBytes));
}
catch (IOException e)
{
throw new RuntimeException("Failed to load default truststore.", e);
}
}
protected static byte[] readStore(InputStream storeStream) throws IOException
{
byte[] buffer = new byte[1024];
int temp;
ByteArrayOutputStream bos = new ByteArrayOutputStream();
while ((temp = storeStream.read(buffer)) != -1)
{
bos.write(buffer, 0, temp);
}
return bos.toByteArray();
}
protected Future<AbstractMap.SimpleEntry<Long, AsyncFile>> openFileForRead(FileSystem fs, String filename)
{
Promise<AbstractMap.SimpleEntry<Long, AsyncFile>> promise = Promise.promise();
fs.exists(filename)
.compose(exists -> {
if (!exists)
{
String errMsg = "File '" + filename + "' does not exist";
return Future.failedFuture(new NoSuchFileException(errMsg));
}
return fs.props(filename);
})
.onSuccess(props -> fs.open(filename, new OpenOptions().setWrite(false).setCreate(false).setRead(true))
.onFailure(promise::tryFail)
.onSuccess(asyncFile -> promise.complete(new AbstractMap.SimpleEntry<>(props.size(),
asyncFile))))
.onFailure(promise::tryFail);
return promise.future();
}
}