blob: d5c20a1570df08b69d2481de1b70ca150a4303e1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.quarkus.kafka.ssl;
import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Map;
import com.github.dockerjava.api.command.InspectContainerResponse;
import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
import org.apache.camel.util.CollectionHelper;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.images.builder.Transferable;
import org.testcontainers.shaded.org.apache.commons.io.FileUtils;
import org.testcontainers.utility.DockerImageName;
import org.testcontainers.utility.MountableFile;
public class KafkaSslTestResource implements QuarkusTestResourceLifecycleManager {
private static final String KAFKA_KEYSTORE_FILE = "kafka-keystore.p12";
private static final String KAFKA_KEYSTORE_PASSWORD = "kafkas3cret";
private static final String KAFKA_KEYSTORE_TYPE = "PKCS12";
private static final String KAFKA_SSL_CREDS_FILE = "broker-creds";
private static final String KAFKA_TRUSTSTORE_FILE = "kafka-truststore.p12";
private static final File TMP_DIR = Paths.get(System.getProperty("java.io.tmpdir"), "kafka").toFile();
private SSLKafkaContainer container;
@Override
public Map<String, String> start() {
// Set up the SSL key / trust store directory
try {
TMP_DIR.mkdirs();
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
URL resource = classLoader.getResource("config");
File serviceBindings = new File(resource.getPath());
for (File keyStore : serviceBindings.listFiles()) {
URL serviceBindingResource = classLoader.getResource("config/" + keyStore.getName());
FileUtils.copyInputStreamToFile(serviceBindingResource.openStream(),
Paths.get(TMP_DIR.getPath(), keyStore.getName()).toFile());
}
} catch (IOException e) {
throw new RuntimeException(e);
}
DockerImageName imageName = DockerImageName.parse("confluentinc/cp-kafka").withTag("5.4.3");
container = new SSLKafkaContainer(imageName);
container.start();
Path keystorePath = TMP_DIR.toPath();
return CollectionHelper.mapOf(
"camel.component.kafka.brokers", container.getBootstrapServers(),
"camel.component.kafka.security-protocol", "SSL",
"camel.component.kafka.ssl-key-password", KAFKA_KEYSTORE_PASSWORD,
"camel.component.kafka.ssl-keystore-location", keystorePath.resolve(KAFKA_KEYSTORE_FILE).toString(),
"camel.component.kafka.ssl-keystore-password", KAFKA_KEYSTORE_PASSWORD,
"camel.component.kafka.ssl-keystore-type", KAFKA_KEYSTORE_TYPE,
"camel.component.kafka.ssl-truststore-location", keystorePath.resolve(KAFKA_TRUSTSTORE_FILE).toString(),
"camel.component.kafka.ssl-truststore-password", KAFKA_KEYSTORE_PASSWORD,
"camel.component.kafka.ssl-truststore-type", KAFKA_KEYSTORE_TYPE);
}
@Override
public void stop() {
if (this.container != null) {
try {
this.container.stop();
FileUtils.deleteDirectory(TMP_DIR);
} catch (Exception e) {
// Ignored
}
}
}
// KafkaContainer does not support SSL OOTB so we need some customizations
static final class SSLKafkaContainer extends KafkaContainer {
SSLKafkaContainer(final DockerImageName dockerImageName) {
super(dockerImageName);
String protocolMap = "SSL:SSL,BROKER:PLAINTEXT";
String listeners = "SSL://0.0.0.0:" + KAFKA_PORT + ",BROKER://0.0.0.0:9092";
withEnv("KAFKA_LISTENERS", listeners);
withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", protocolMap);
withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER");
withEnv("KAFKA_SSL_KEY_CREDENTIALS", KAFKA_SSL_CREDS_FILE);
withEnv("KAFKA_SSL_KEYSTORE_FILENAME", KAFKA_KEYSTORE_FILE);
withEnv("KAFKA_SSL_KEYSTORE_CREDENTIALS", KAFKA_SSL_CREDS_FILE);
withEnv("KAFKA_SSL_KEYSTORE_TYPE", KAFKA_KEYSTORE_TYPE);
withEnv("KAFKA_SSL_TRUSTSTORE_FILENAME", KAFKA_TRUSTSTORE_FILE);
withEnv("KAFKA_SSL_TRUSTSTORE_CREDENTIALS", KAFKA_SSL_CREDS_FILE);
withEnv("KAFKA_SSL_TRUSTSTORE_TYPE", KAFKA_KEYSTORE_TYPE);
withEnv("KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM", "");
withEnv("KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE", "false");
withEmbeddedZookeeper().waitingFor(Wait.forListeningPort());
withLogConsumer(frame -> System.out.print(frame.getUtf8String()));
}
@Override
public String getBootstrapServers() {
return String.format("SSL://%s:%s", getHost(), getMappedPort(KAFKA_PORT));
}
@Override
protected void containerIsStarting(InspectContainerResponse containerInfo, boolean reused) {
super.containerIsStarting(containerInfo, reused);
copyFileToContainer(
MountableFile.forClasspathResource("config/" + KAFKA_KEYSTORE_FILE),
"/etc/kafka/secrets/" + KAFKA_KEYSTORE_FILE);
copyFileToContainer(
MountableFile.forClasspathResource("config/" + KAFKA_TRUSTSTORE_FILE),
"/etc/kafka/secrets/" + KAFKA_TRUSTSTORE_FILE);
copyFileToContainer(
Transferable.of(KAFKA_KEYSTORE_PASSWORD.getBytes(StandardCharsets.UTF_8)),
"/etc/kafka/secrets/" + KAFKA_SSL_CREDS_FILE);
}
}
}