[WAYANG-28] Kafka Shipper implementation and tests
diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-shipper/wayang-hackit-shipper-kafka/src/main/java/org/apache/wayang/hackit/shipper/kafka/receiver/ReceiverKafka.java b/wayang-plugins/wayang-hackit/wayang-hackit-shipper/wayang-hackit-shipper-kafka/src/main/java/org/apache/wayang/hackit/shipper/kafka/receiver/ReceiverKafka.java
new file mode 100644
index 0000000..49dab87
--- /dev/null
+++ b/wayang-plugins/wayang-hackit/wayang-hackit-shipper/wayang-hackit-shipper-kafka/src/main/java/org/apache/wayang/hackit/shipper/kafka/receiver/ReceiverKafka.java
@@ -0,0 +1,89 @@
+package org.apache.wayang.hackit.shipper.kafka.receiver;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.wayang.plugin.hackit.core.sniffer.shipper.PSProtocol;
+import org.apache.wayang.plugin.hackit.core.sniffer.shipper.receiver.Receiver;
+import org.apache.wayang.plugin.hackit.core.tuple.HackitTuple;
+import org.apache.wayang.plugin.hackit.core.tuple.header.Header;
+
+import java.util.*;
+
+public class ReceiverKafka<K, T> extends Receiver<HackitTuple<K, T>> implements PSProtocol {
+
+    //TODO Get from configuration
+    static Map<String, String> KAFKA_MAPPING;
+    static {
+        KAFKA_MAPPING = new HashMap<>();
+        KAFKA_MAPPING.put("127.0.0.1", "127.0.0.1");
+    }
+    static Integer numPartitions = 1;
+    static Short replicationFactor = 1;
+
+
+    Consumer<K, T> consumer;
+    Properties config;
+    List<String> topics;
+
+    public ReceiverKafka(Properties config){
+        this.config = config;
+        this.topics = new ArrayList<>();
+    }
+
+
+    @Override
+    public PSProtocol addTopic(String... topic) {
+
+        this.topics.addAll(Arrays.asList(topic));
+        this.consumer.subscribe(this.topics);
+        return this;
+    }
+
+    @Override
+    public PSProtocol addExchange(String exchange) {
+        return null;
+    }
+
+    @Override
+    public void init() {
+        this.consumer =
+                new KafkaConsumer<>(config);
+    }
+
+    @Override
+    public Iterator<HackitTuple<K, T>> getElements() {
+
+        final int giveUp = 100;   int noRecordsCount = 0;
+        ConsumerRecords<K, T> consumerRecords;
+        while (true) {
+            consumerRecords =
+                    consumer.poll(1000);
+
+            if (consumerRecords.count()==0) {
+                noRecordsCount++;
+                if (noRecordsCount > giveUp) break;
+                else continue;
+            }
+
+            List<HackitTuple<K, T>> list = new ArrayList<>();
+            consumerRecords.forEach(record ->{
+                HackitTuple<K, T> result = new HackitTuple<>(
+                        record.value()
+                );
+                // System.out.println("received " + record.value());
+                list.add(result);
+            });
+
+            consumer.commitAsync();
+
+            return list.listIterator();
+        }
+        return null;
+    }
+
+    @Override
+    public void close() {
+        consumer.close();
+    }
+}
diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-shipper/wayang-hackit-shipper-kafka/src/main/java/org/apache/wayang/hackit/shipper/kafka/sender/SenderKafka.java b/wayang-plugins/wayang-hackit/wayang-hackit-shipper/wayang-hackit-shipper-kafka/src/main/java/org/apache/wayang/hackit/shipper/kafka/sender/SenderKafka.java
new file mode 100644
index 0000000..635f7ee
--- /dev/null
+++ b/wayang-plugins/wayang-hackit/wayang-hackit-shipper/wayang-hackit-shipper-kafka/src/main/java/org/apache/wayang/hackit/shipper/kafka/sender/SenderKafka.java
@@ -0,0 +1,100 @@
+package org.apache.wayang.hackit.shipper.kafka.sender;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.wayang.plugin.hackit.core.sniffer.shipper.PSProtocol;
+import org.apache.wayang.plugin.hackit.core.sniffer.shipper.sender.Sender;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+public class SenderKafka<K, T> implements Sender<T>, PSProtocol {
+
+    //TODO Get from configuration
+    static Map<String, String> KAFKA_MAPPING;
+    static {
+        KAFKA_MAPPING = new HashMap<>();
+        KAFKA_MAPPING.put("127.0.0.1", "127.0.0.1");
+    }
+    static Integer numPartitions = 1;
+    static Short replicationFactor = 1;
+
+
+    Producer<K, T> producer;
+    Properties config;
+    List<String> topics;
+
+    public SenderKafka(Properties config){
+        this.config = config;
+        this.topics = new ArrayList<>();
+    }
+
+    public PSProtocol preAddTopic(String... topic) {
+        AdminClient ad = AdminClient.create(config);
+        List<String> topicsToCreate = new ArrayList<>();
+        Collection<NewTopic> l = Arrays.stream(topic)
+                .map(t -> {
+                    topicsToCreate.add(t);
+                    return new NewTopic(t, numPartitions, replicationFactor);
+                })
+                .collect(Collectors.toList());
+
+        try {
+            final CreateTopicsResult result = ad.createTopics(l);
+            result.all().get();
+            this.topics.addAll(topicsToCreate);
+            topicsToCreate.clear();
+        } catch (final Exception e) {
+            throw new RuntimeException("Failed to create topic:" + topic, e);
+        }
+
+        return this;
+    }
+
+    @Override
+    public PSProtocol addTopic(String... topic) {
+
+        this.topics.addAll(Arrays.stream(topic).collect(Collectors.toList()));
+        return this;
+    }
+
+    /*Not used by Kafka*/
+    @Override
+    public PSProtocol addExchange(String exchange) {
+        return null;
+    }
+
+    @Override
+    public void init() {
+        this.producer = new KafkaProducer<>(config);
+    }
+
+    @Override
+    public void send(T value) {
+
+        for (String topic : topics) {
+            producer.send(
+                    new ProducerRecord<>(topic, null, value));
+        }
+
+    }
+
+    public void send(K key, T value) {
+
+        for (String topic : topics) {
+            producer.send(
+                    new ProducerRecord<>(topic, key, value));
+            // System.out.println(value + " sent to " + topic);
+        }
+
+    }
+
+    @Override
+    public void close() {
+        producer.close();
+    }
+}
diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-shipper/wayang-hackit-shipper-kafka/src/test/java/org/apache/wayang/hackit/shipper/kafka/receiver/ReceiverKafkaTest.java b/wayang-plugins/wayang-hackit/wayang-hackit-shipper/wayang-hackit-shipper-kafka/src/test/java/org/apache/wayang/hackit/shipper/kafka/receiver/ReceiverKafkaTest.java
new file mode 100644
index 0000000..84f4a82
--- /dev/null
+++ b/wayang-plugins/wayang-hackit/wayang-hackit-shipper/wayang-hackit-shipper-kafka/src/test/java/org/apache/wayang/hackit/shipper/kafka/receiver/ReceiverKafkaTest.java
@@ -0,0 +1,32 @@
+package org.apache.wayang.hackit.shipper.kafka.receiver;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.wayang.plugin.hackit.core.tuple.HackitTuple;
+import org.junit.jupiter.api.Test;
+
+import java.util.Iterator;
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+class ReceiverKafkaTest {
+
+    @Test
+    void receiveMessage() {
+
+        final Properties props = new Properties();
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        props.put(ConsumerConfig.GROUP_ID_CONFIG,  "1");
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+
+        ReceiverKafka<String, String> receiver = new ReceiverKafka<>(props);
+        receiver.init();
+        receiver.addTopic("debug");
+        Iterator<HackitTuple<String, String>> results = receiver.getElements();
+        results.forEachRemaining(t -> System.out.println(t.getValue()));
+        receiver.close();
+    }
+}
\ No newline at end of file
diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-shipper/wayang-hackit-shipper-kafka/src/test/java/org/apache/wayang/hackit/shipper/kafka/sender/SenderKafkaTest.java b/wayang-plugins/wayang-hackit/wayang-hackit-shipper/wayang-hackit-shipper-kafka/src/test/java/org/apache/wayang/hackit/shipper/kafka/sender/SenderKafkaTest.java
new file mode 100644
index 0000000..d1803e4
--- /dev/null
+++ b/wayang-plugins/wayang-hackit/wayang-hackit-shipper/wayang-hackit-shipper-kafka/src/test/java/org/apache/wayang/hackit/shipper/kafka/sender/SenderKafkaTest.java
@@ -0,0 +1,37 @@
+package org.apache.wayang.hackit.shipper.kafka.sender;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.junit.jupiter.api.Test;
+
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+class SenderKafkaTest {
+
+    @Test
+    void sendMessage() {
+
+        Properties props = new Properties();
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        //Set acknowledgements for producer requests.
+        props.put(ProducerConfig.ACKS_CONFIG,  "1");
+        //If the request fails, the producer can automatically retry,
+        props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 12);
+        props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
+        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1);
+        props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 1);
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+
+        SenderKafka<String, String> sender = new SenderKafka<>(props);
+        sender.init();
+        sender.addTopic("debug");
+        sender.send("perro", "gato");
+        sender.send("pulpo", "atun");
+        sender.close();
+    }
+
+}
\ No newline at end of file