blob: 9031936c21ee4a47391dd87a9df0cf01631b834f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.kafkaconnector.services.kafka;
import java.util.function.Consumer;
import com.github.dockerjava.api.command.CreateContainerCmd;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.wait.strategy.Wait;
public class StrimziContainer extends GenericContainer {
private static final String STRIMZI_CONTAINER = "strimzi/kafka:0.11.4-kafka-2.1.0";
private static final int KAFKA_PORT = 9092;
public StrimziContainer(Network network, String name) {
super(STRIMZI_CONTAINER);
withEnv("LOG_DIR", "/tmp/logs");
withEnv("KAFKA_ADVERTISED_LISTENERS", "PLAINTEXT://localhost:9092");
withEnv("KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:9092");
withEnv("KAFKA_ZOOKEEPER_CONNECT", "zookeeper:2181");
withNetwork(network);
withCreateContainerCmdModifier(
new Consumer<CreateContainerCmd>() {
@Override
public void accept(CreateContainerCmd createContainerCmd) {
createContainerCmd.withHostName(name);
createContainerCmd.withName(name);
}
}
);
withCommand("sh", "-c",
"bin/kafka-server-start.sh config/server.properties "
+ "--override listeners=${KAFKA_LISTENERS} "
+ "--override advertised.listeners=${KAFKA_ADVERTISED_LISTENERS} "
+ "--override zookeeper.connect=${KAFKA_ZOOKEEPER_CONNECT}");
waitingFor(Wait.forListeningPort());
}
public int getKafkaPort() {
return getMappedPort(KAFKA_PORT);
}
@Override
public void start() {
addFixedExposedPort(KAFKA_PORT, KAFKA_PORT);
super.start();
}
}