[WAYANG-28] Kafka Shipper implementation and tests
diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-shipper/wayang-hackit-shipper-kafka/src/main/java/org/apache/wayang/hackit/shipper/kafka/receiver/ReceiverKafka.java b/wayang-plugins/wayang-hackit/wayang-hackit-shipper/wayang-hackit-shipper-kafka/src/main/java/org/apache/wayang/hackit/shipper/kafka/receiver/ReceiverKafka.java
new file mode 100644
index 0000000..49dab87
--- /dev/null
+++ b/wayang-plugins/wayang-hackit/wayang-hackit-shipper/wayang-hackit-shipper-kafka/src/main/java/org/apache/wayang/hackit/shipper/kafka/receiver/ReceiverKafka.java
@@ -0,0 +1,89 @@
+package org.apache.wayang.hackit.shipper.kafka.receiver;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.wayang.plugin.hackit.core.sniffer.shipper.PSProtocol;
+import org.apache.wayang.plugin.hackit.core.sniffer.shipper.receiver.Receiver;
+import org.apache.wayang.plugin.hackit.core.tuple.HackitTuple;
+import org.apache.wayang.plugin.hackit.core.tuple.header.Header;
+
+import java.util.*;
+
+public class ReceiverKafka<K, T> extends Receiver<HackitTuple<K, T>> implements PSProtocol {
+
+ //TODO Get from configuration
+ static Map<String, String> KAFKA_MAPPING;
+ static {
+ KAFKA_MAPPING = new HashMap<>();
+ KAFKA_MAPPING.put("127.0.0.1", "127.0.0.1");
+ }
+ static Integer numPartitions = 1;
+ static Short replicationFactor = 1;
+
+
+ Consumer<K, T> consumer;
+ Properties config;
+ List<String> topics;
+
+ public ReceiverKafka(Properties config){
+ this.config = config;
+ this.topics = new ArrayList<>();
+ }
+
+
+ @Override
+ public PSProtocol addTopic(String... topic) {
+
+ this.topics.addAll(Arrays.asList(topic));
+ this.consumer.subscribe(this.topics);
+ return this;
+ }
+
+ @Override
+ public PSProtocol addExchange(String exchange) {
+ return null;
+ }
+
+ @Override
+ public void init() {
+ this.consumer =
+ new KafkaConsumer<>(config);
+ }
+
+ @Override
+ public Iterator<HackitTuple<K, T>> getElements() {
+
+ final int giveUp = 100; int noRecordsCount = 0;
+ ConsumerRecords<K, T> consumerRecords;
+ while (true) {
+ consumerRecords =
+ consumer.poll(1000);
+
+ if (consumerRecords.count()==0) {
+ noRecordsCount++;
+ if (noRecordsCount > giveUp) break;
+ else continue;
+ }
+
+ List<HackitTuple<K, T>> list = new ArrayList<>();
+ consumerRecords.forEach(record ->{
+ HackitTuple<K, T> result = new HackitTuple<>(
+ record.value()
+ );
+ // System.out.println("received " + record.value());
+ list.add(result);
+ });
+
+ consumer.commitAsync();
+
+ return list.listIterator();
+ }
+ return null;
+ }
+
+ @Override
+ public void close() {
+ consumer.close();
+ }
+}
diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-shipper/wayang-hackit-shipper-kafka/src/main/java/org/apache/wayang/hackit/shipper/kafka/sender/SenderKafka.java b/wayang-plugins/wayang-hackit/wayang-hackit-shipper/wayang-hackit-shipper-kafka/src/main/java/org/apache/wayang/hackit/shipper/kafka/sender/SenderKafka.java
new file mode 100644
index 0000000..635f7ee
--- /dev/null
+++ b/wayang-plugins/wayang-hackit/wayang-hackit-shipper/wayang-hackit-shipper-kafka/src/main/java/org/apache/wayang/hackit/shipper/kafka/sender/SenderKafka.java
@@ -0,0 +1,100 @@
+package org.apache.wayang.hackit.shipper.kafka.sender;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.wayang.plugin.hackit.core.sniffer.shipper.PSProtocol;
+import org.apache.wayang.plugin.hackit.core.sniffer.shipper.sender.Sender;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+public class SenderKafka<K, T> implements Sender<T>, PSProtocol {
+
+ //TODO Get from configuration
+ static Map<String, String> KAFKA_MAPPING;
+ static {
+ KAFKA_MAPPING = new HashMap<>();
+ KAFKA_MAPPING.put("127.0.0.1", "127.0.0.1");
+ }
+ static Integer numPartitions = 1;
+ static Short replicationFactor = 1;
+
+
+ Producer<K, T> producer;
+ Properties config;
+ List<String> topics;
+
+ public SenderKafka(Properties config){
+ this.config = config;
+ this.topics = new ArrayList<>();
+ }
+
+ public PSProtocol preAddTopic(String... topic) {
+ AdminClient ad = AdminClient.create(config);
+ List<String> topicsToCreate = new ArrayList<>();
+ Collection<NewTopic> l = Arrays.stream(topic)
+ .map(t -> {
+ topicsToCreate.add(t);
+ return new NewTopic(t, numPartitions, replicationFactor);
+ })
+ .collect(Collectors.toList());
+
+ try {
+ final CreateTopicsResult result = ad.createTopics(l);
+ result.all().get();
+ this.topics.addAll(topicsToCreate);
+ topicsToCreate.clear();
+ } catch (final Exception e) {
+ throw new RuntimeException("Failed to create topic:" + topic, e);
+ }
+
+ return this;
+ }
+
+ @Override
+ public PSProtocol addTopic(String... topic) {
+
+ this.topics.addAll(Arrays.stream(topic).collect(Collectors.toList()));
+ return this;
+ }
+
+ /*Not used by Kafka*/
+ @Override
+ public PSProtocol addExchange(String exchange) {
+ return null;
+ }
+
+ @Override
+ public void init() {
+ this.producer = new KafkaProducer<>(config);
+ }
+
+ @Override
+ public void send(T value) {
+
+ for (String topic : topics) {
+ producer.send(
+ new ProducerRecord<>(topic, null, value));
+ }
+
+ }
+
+ public void send(K key, T value) {
+
+ for (String topic : topics) {
+ producer.send(
+ new ProducerRecord<>(topic, key, value));
+ // System.out.println(value + " sent to " + topic);
+ }
+
+ }
+
+ @Override
+ public void close() {
+ producer.close();
+ }
+}
diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-shipper/wayang-hackit-shipper-kafka/src/test/java/org/apache/wayang/hackit/shipper/kafka/receiver/ReceiverKafkaTest.java b/wayang-plugins/wayang-hackit/wayang-hackit-shipper/wayang-hackit-shipper-kafka/src/test/java/org/apache/wayang/hackit/shipper/kafka/receiver/ReceiverKafkaTest.java
new file mode 100644
index 0000000..84f4a82
--- /dev/null
+++ b/wayang-plugins/wayang-hackit/wayang-hackit-shipper/wayang-hackit-shipper-kafka/src/test/java/org/apache/wayang/hackit/shipper/kafka/receiver/ReceiverKafkaTest.java
@@ -0,0 +1,32 @@
+package org.apache.wayang.hackit.shipper.kafka.receiver;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.wayang.plugin.hackit.core.tuple.HackitTuple;
+import org.junit.jupiter.api.Test;
+
+import java.util.Iterator;
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+class ReceiverKafkaTest {
+
+ @Test
+ void receiveMessage() {
+
+ final Properties props = new Properties();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, "1");
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+
+ ReceiverKafka<String, String> receiver = new ReceiverKafka<>(props);
+ receiver.init();
+ receiver.addTopic("debug");
+ Iterator<HackitTuple<String, String>> results = receiver.getElements();
+ results.forEachRemaining(t -> System.out.println(t.getValue()));
+ receiver.close();
+ }
+}
\ No newline at end of file
diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-shipper/wayang-hackit-shipper-kafka/src/test/java/org/apache/wayang/hackit/shipper/kafka/sender/SenderKafkaTest.java b/wayang-plugins/wayang-hackit/wayang-hackit-shipper/wayang-hackit-shipper-kafka/src/test/java/org/apache/wayang/hackit/shipper/kafka/sender/SenderKafkaTest.java
new file mode 100644
index 0000000..d1803e4
--- /dev/null
+++ b/wayang-plugins/wayang-hackit/wayang-hackit-shipper/wayang-hackit-shipper-kafka/src/test/java/org/apache/wayang/hackit/shipper/kafka/sender/SenderKafkaTest.java
@@ -0,0 +1,37 @@
+package org.apache.wayang.hackit.shipper.kafka.sender;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.junit.jupiter.api.Test;
+
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+class SenderKafkaTest {
+
+ @Test
+ void sendMessage() {
+
+ Properties props = new Properties();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ //Set acknowledgements for producer requests.
+ props.put(ProducerConfig.ACKS_CONFIG, "1");
+ //If the request fails, the producer can automatically retry,
+ props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 12);
+ props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
+ props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1);
+ props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 1);
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+
+ SenderKafka<String, String> sender = new SenderKafka<>(props);
+ sender.init();
+ sender.addTopic("debug");
+ sender.send("perro", "gato");
+ sender.send("pulpo", "atun");
+ sender.close();
+ }
+
+}
\ No newline at end of file