Added support for testing the Kafka service using Strimzi

Also updates the default Kafka service to use the containers from the Strimzi project: fix #43
diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/services/kafka/KafkaServiceFactory.java b/tests/src/test/java/org/apache/camel/kafkaconnector/services/kafka/KafkaServiceFactory.java
index f3ab9a7..623abcd 100644
--- a/tests/src/test/java/org/apache/camel/kafkaconnector/services/kafka/KafkaServiceFactory.java
+++ b/tests/src/test/java/org/apache/camel/kafkaconnector/services/kafka/KafkaServiceFactory.java
@@ -28,7 +28,12 @@
 
     public static KafkaService createService() {
         String kafkaRemote = System.getProperty("kafka.instance.type");
-        if (kafkaRemote == null || kafkaRemote.equals("local-kafka-container")) {
+
+        if (kafkaRemote == null || kafkaRemote.equals("local-strimzi-container")) {
+            return new StrimziService();
+        }
+
+        if (kafkaRemote.equals("local-kafka-container")) {
             return new ContainerLocalKafkaService();
         }
 
@@ -36,8 +41,7 @@
             return new RemoteKafkaService();
         }
 
-        LOG.error("Invalid Kafka instance type: {}. Must be one of 'local-kafka-container' or 'remote",
-                kafkaRemote);
+        LOG.error("Invalid Kafka instance must be one of 'local-strimzi-container', 'local-kafka-container' or 'remote");
         throw new UnsupportedOperationException("Invalid Kafka instance type:");
     }
 }
diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/services/kafka/StrimziContainer.java b/tests/src/test/java/org/apache/camel/kafkaconnector/services/kafka/StrimziContainer.java
new file mode 100644
index 0000000..40176b9
--- /dev/null
+++ b/tests/src/test/java/org/apache/camel/kafkaconnector/services/kafka/StrimziContainer.java
@@ -0,0 +1,68 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.services.kafka;
+
+import java.util.function.Consumer;
+import com.github.dockerjava.api.command.CreateContainerCmd;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+
+public class StrimziContainer extends GenericContainer {
+    private static final String STRIMZI_CONTAINER = "strimzi/kafka:0.11.4-kafka-2.1.0";
+    private static final int KAFKA_PORT = 9092;
+
+    public StrimziContainer(Network network, String name) {
+        super(STRIMZI_CONTAINER);
+
+        withEnv("LOG_DIR", "/tmp/logs");
+        withEnv("KAFKA_ADVERTISED_LISTENERS", "PLAINTEXT://localhost:9092");
+        withEnv("KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:9092");
+        withEnv("KAFKA_ZOOKEEPER_CONNECT", "zookeeper:2181");
+        withNetwork(network);
+
+        withCreateContainerCmdModifier(
+                new Consumer<CreateContainerCmd>() {
+                    @Override
+                    public void accept(CreateContainerCmd createContainerCmd) {
+                        createContainerCmd.withHostName(name);
+                        createContainerCmd.withName(name);
+                    }
+                }
+        );
+
+
+        withCommand("sh", "-c",
+                "bin/kafka-server-start.sh config/server.properties "
+                        + "--override listeners=${KAFKA_LISTENERS} "
+                        + "--override advertised.listeners=${KAFKA_ADVERTISED_LISTENERS} "
+                        + "--override zookeeper.connect=${KAFKA_ZOOKEEPER_CONNECT}");
+    }
+
+
+
+    public int getKafkaPort() {
+        return getMappedPort(KAFKA_PORT);
+    }
+
+
+    @Override
+    public void start() {
+        addFixedExposedPort(KAFKA_PORT, KAFKA_PORT);
+        super.start();
+    }
+}
diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/services/kafka/StrimziService.java b/tests/src/test/java/org/apache/camel/kafkaconnector/services/kafka/StrimziService.java
new file mode 100644
index 0000000..7b02eef
--- /dev/null
+++ b/tests/src/test/java/org/apache/camel/kafkaconnector/services/kafka/StrimziService.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.services.kafka;
+
+import org.apache.camel.kafkaconnector.ContainerUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Network;
+
+public class StrimziService implements KafkaService {
+    private static final Logger LOG = LoggerFactory.getLogger(StrimziService.class);
+    private final ZookeeperContainer zookeeperContainer;
+    private final StrimziContainer strimziContainer;
+
+    public StrimziService() {
+        Network network = Network.newNetwork();
+
+        zookeeperContainer = new ZookeeperContainer(network, "zookeeper");
+        strimziContainer = new StrimziContainer(network, "strimzi");
+    }
+
+    private Integer getKafkaPort() {
+        return strimziContainer.getKafkaPort();
+    }
+
+    @Override
+    public String getBootstrapServers() {
+        return "localhost:" + getKafkaPort();
+    }
+
+    @Override
+    public void initialize() {
+        zookeeperContainer.start();
+        ContainerUtil.waitForInitialization(zookeeperContainer);
+
+        String zookeeperConnect = "zookeeper:" + zookeeperContainer.getZookeeperPort();
+        LOG.info("Apache Zookeeper running at address {}", zookeeperConnect);
+
+        strimziContainer.start();
+        ContainerUtil.waitForInitialization(strimziContainer);
+
+        LOG.info("Kafka bootstrap server running at address {}", getBootstrapServers());
+    }
+}
diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/services/kafka/ZookeeperContainer.java b/tests/src/test/java/org/apache/camel/kafkaconnector/services/kafka/ZookeeperContainer.java
new file mode 100644
index 0000000..45cb97f
--- /dev/null
+++ b/tests/src/test/java/org/apache/camel/kafkaconnector/services/kafka/ZookeeperContainer.java
@@ -0,0 +1,55 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.services.kafka;
+
+import java.util.function.Consumer;
+import com.github.dockerjava.api.command.CreateContainerCmd;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.wait.strategy.Wait;
+
+public class ZookeeperContainer extends GenericContainer {
+    private static final String ZOOKEEPER_CONTAINER = "strimzi/kafka:0.11.4-kafka-2.1.0";
+    private static final int ZOOKEEPER_PORT = 2181;
+
+    public ZookeeperContainer(Network network, String name) {
+        super(ZOOKEEPER_CONTAINER);
+
+        withEnv("LOG_DIR", "/tmp/logs");
+        withExposedPorts(ZOOKEEPER_PORT);
+        withNetwork(network);
+        withCreateContainerCmdModifier(
+                new Consumer<CreateContainerCmd>() {
+                    @Override
+                    public void accept(CreateContainerCmd createContainerCmd) {
+                        createContainerCmd.withHostName(name);
+                        createContainerCmd.withName(name);
+                    }
+                }
+        );
+
+        withCommand( "sh", "-c",
+                "bin/zookeeper-server-start.sh config/zookeeper.properties");
+    }
+
+
+    public int getZookeeperPort() {
+        return getMappedPort(ZOOKEEPER_PORT);
+    }
+
+}