Added support for testing the Kafka service using Strimzi
Also updates the default Kafka service to use the containers from the Strimzi project: fix #43
diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/services/kafka/KafkaServiceFactory.java b/tests/src/test/java/org/apache/camel/kafkaconnector/services/kafka/KafkaServiceFactory.java
index f3ab9a7..623abcd 100644
--- a/tests/src/test/java/org/apache/camel/kafkaconnector/services/kafka/KafkaServiceFactory.java
+++ b/tests/src/test/java/org/apache/camel/kafkaconnector/services/kafka/KafkaServiceFactory.java
@@ -28,7 +28,12 @@
public static KafkaService createService() {
String kafkaRemote = System.getProperty("kafka.instance.type");
- if (kafkaRemote == null || kafkaRemote.equals("local-kafka-container")) {
+
+ if (kafkaRemote == null || kafkaRemote.equals("local-strimzi-container")) {
+ return new StrimziService();
+ }
+
+ if (kafkaRemote.equals("local-kafka-container")) {
return new ContainerLocalKafkaService();
}
@@ -36,8 +41,7 @@
return new RemoteKafkaService();
}
- LOG.error("Invalid Kafka instance type: {}. Must be one of 'local-kafka-container' or 'remote",
- kafkaRemote);
+ LOG.error("Invalid Kafka instance must be one of 'local-strimzi-container', 'local-kafka-container' or 'remote");
throw new UnsupportedOperationException("Invalid Kafka instance type:");
}
}
diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/services/kafka/StrimziContainer.java b/tests/src/test/java/org/apache/camel/kafkaconnector/services/kafka/StrimziContainer.java
new file mode 100644
index 0000000..40176b9
--- /dev/null
+++ b/tests/src/test/java/org/apache/camel/kafkaconnector/services/kafka/StrimziContainer.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.services.kafka;
+
+import java.util.function.Consumer;
+import com.github.dockerjava.api.command.CreateContainerCmd;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+
+public class StrimziContainer extends GenericContainer {
+ private static final String STRIMZI_CONTAINER = "strimzi/kafka:0.11.4-kafka-2.1.0";
+ private static final int KAFKA_PORT = 9092;
+
+ public StrimziContainer(Network network, String name) {
+ super(STRIMZI_CONTAINER);
+
+ withEnv("LOG_DIR", "/tmp/logs");
+ withEnv("KAFKA_ADVERTISED_LISTENERS", "PLAINTEXT://localhost:9092");
+ withEnv("KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:9092");
+ withEnv("KAFKA_ZOOKEEPER_CONNECT", "zookeeper:2181");
+ withNetwork(network);
+
+ withCreateContainerCmdModifier(
+ new Consumer<CreateContainerCmd>() {
+ @Override
+ public void accept(CreateContainerCmd createContainerCmd) {
+ createContainerCmd.withHostName(name);
+ createContainerCmd.withName(name);
+ }
+ }
+ );
+
+
+ withCommand("sh", "-c",
+ "bin/kafka-server-start.sh config/server.properties "
+ + "--override listeners=${KAFKA_LISTENERS} "
+ + "--override advertised.listeners=${KAFKA_ADVERTISED_LISTENERS} "
+ + "--override zookeeper.connect=${KAFKA_ZOOKEEPER_CONNECT}");
+ }
+
+
+
+ public int getKafkaPort() {
+ return getMappedPort(KAFKA_PORT);
+ }
+
+
+ @Override
+ public void start() {
+ addFixedExposedPort(KAFKA_PORT, KAFKA_PORT);
+ super.start();
+ }
+}
diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/services/kafka/StrimziService.java b/tests/src/test/java/org/apache/camel/kafkaconnector/services/kafka/StrimziService.java
new file mode 100644
index 0000000..7b02eef
--- /dev/null
+++ b/tests/src/test/java/org/apache/camel/kafkaconnector/services/kafka/StrimziService.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.services.kafka;
+
+import org.apache.camel.kafkaconnector.ContainerUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Network;
+
+public class StrimziService implements KafkaService {
+ private static final Logger LOG = LoggerFactory.getLogger(StrimziService.class);
+ private final ZookeeperContainer zookeeperContainer;
+ private final StrimziContainer strimziContainer;
+
+ public StrimziService() {
+ Network network = Network.newNetwork();
+
+ zookeeperContainer = new ZookeeperContainer(network, "zookeeper");
+ strimziContainer = new StrimziContainer(network, "strimzi");
+ }
+
+ private Integer getKafkaPort() {
+ return strimziContainer.getKafkaPort();
+ }
+
+ @Override
+ public String getBootstrapServers() {
+ return "localhost:" + getKafkaPort();
+ }
+
+ @Override
+ public void initialize() {
+ zookeeperContainer.start();
+ ContainerUtil.waitForInitialization(zookeeperContainer);
+
+ String zookeeperConnect = "zookeeper:" + zookeeperContainer.getZookeeperPort();
+ LOG.info("Apache Zookeeper running at address {}", zookeeperConnect);
+
+ strimziContainer.start();
+ ContainerUtil.waitForInitialization(strimziContainer);
+
+ LOG.info("Kafka bootstrap server running at address {}", getBootstrapServers());
+ }
+}
diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/services/kafka/ZookeeperContainer.java b/tests/src/test/java/org/apache/camel/kafkaconnector/services/kafka/ZookeeperContainer.java
new file mode 100644
index 0000000..45cb97f
--- /dev/null
+++ b/tests/src/test/java/org/apache/camel/kafkaconnector/services/kafka/ZookeeperContainer.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.services.kafka;
+
+import java.util.function.Consumer;
+import com.github.dockerjava.api.command.CreateContainerCmd;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.wait.strategy.Wait;
+
+public class ZookeeperContainer extends GenericContainer {
+ private static final String ZOOKEEPER_CONTAINER = "strimzi/kafka:0.11.4-kafka-2.1.0";
+ private static final int ZOOKEEPER_PORT = 2181;
+
+ public ZookeeperContainer(Network network, String name) {
+ super(ZOOKEEPER_CONTAINER);
+
+ withEnv("LOG_DIR", "/tmp/logs");
+ withExposedPorts(ZOOKEEPER_PORT);
+ withNetwork(network);
+ withCreateContainerCmdModifier(
+ new Consumer<CreateContainerCmd>() {
+ @Override
+ public void accept(CreateContainerCmd createContainerCmd) {
+ createContainerCmd.withHostName(name);
+ createContainerCmd.withName(name);
+ }
+ }
+ );
+
+ withCommand( "sh", "-c",
+ "bin/zookeeper-server-start.sh config/zookeeper.properties");
+ }
+
+
+ public int getZookeeperPort() {
+ return getMappedPort(ZOOKEEPER_PORT);
+ }
+
+}