blob: 635f7ee8c036b5de856cf0cd12471ba7e233d2d2 [file] [log] [blame]
package org.apache.wayang.hackit.shipper.kafka.sender;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.wayang.plugin.hackit.core.sniffer.shipper.PSProtocol;
import org.apache.wayang.plugin.hackit.core.sniffer.shipper.sender.Sender;
import java.util.*;
import java.util.stream.Collectors;
public class SenderKafka<K, T> implements Sender<T>, PSProtocol {
//TODO Get from configuration
static Map<String, String> KAFKA_MAPPING;
static {
KAFKA_MAPPING = new HashMap<>();
KAFKA_MAPPING.put("127.0.0.1", "127.0.0.1");
}
static Integer numPartitions = 1;
static Short replicationFactor = 1;
Producer<K, T> producer;
Properties config;
List<String> topics;
public SenderKafka(Properties config){
this.config = config;
this.topics = new ArrayList<>();
}
public PSProtocol preAddTopic(String... topic) {
AdminClient ad = AdminClient.create(config);
List<String> topicsToCreate = new ArrayList<>();
Collection<NewTopic> l = Arrays.stream(topic)
.map(t -> {
topicsToCreate.add(t);
return new NewTopic(t, numPartitions, replicationFactor);
})
.collect(Collectors.toList());
try {
final CreateTopicsResult result = ad.createTopics(l);
result.all().get();
this.topics.addAll(topicsToCreate);
topicsToCreate.clear();
} catch (final Exception e) {
throw new RuntimeException("Failed to create topic:" + topic, e);
}
return this;
}
@Override
public PSProtocol addTopic(String... topic) {
this.topics.addAll(Arrays.stream(topic).collect(Collectors.toList()));
return this;
}
/*Not used by Kafka*/
@Override
public PSProtocol addExchange(String exchange) {
return null;
}
@Override
public void init() {
this.producer = new KafkaProducer<>(config);
}
@Override
public void send(T value) {
for (String topic : topics) {
producer.send(
new ProducerRecord<>(topic, null, value));
}
}
public void send(K key, T value) {
for (String topic : topics) {
producer.send(
new ProducerRecord<>(topic, key, value));
// System.out.println(value + " sent to " + topic);
}
}
@Override
public void close() {
producer.close();
}
}