blob: 23fa141fd0b4bfb05a29d4d9a6a477aacadf8f43 [file] [log] [blame]
package geode.kafka.source;
import geode.kafka.GeodeContext;
import geode.kafka.GeodeConnectorConfig;
import org.apache.geode.cache.query.CqAttributes;
import org.apache.geode.cache.query.CqAttributesFactory;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;
import static geode.kafka.GeodeConnectorConfig.BATCH_SIZE;
import static geode.kafka.GeodeConnectorConfig.CQ_PREFIX;
import static geode.kafka.GeodeConnectorConfig.QUEUE_SIZE;
import static geode.kafka.GeodeConnectorConfig.REGION_NAME;
public class GeodeKafkaSourceTask extends SourceTask {
private static final String TASK_PREFIX = "TASK";
private static final String DOT = ".";
//property string to pass in to identify task
private static final Map<String, Long> OFFSET_DEFAULT = createOffset();
private int taskId;
private GeodeContext geodeContext;
private List<String> topics;
private Map<String, Map<String, String>> sourcePartitions;
private static BlockingQueue<GeodeEvent> eventBuffer;
private int batchSize;
private static Map<String, Long> createOffset() {
Map<String, Long> offset = new HashMap<>();
offset.put("OFFSET", 0L);
return offset;
}
@Override
public String version() {
return null;
}
@Override
public void start(Map<String, String> props) {
try {
GeodeConnectorConfig geodeConnectorConfig = new GeodeConnectorConfig(props);
geodeContext = new GeodeContext(geodeConnectorConfig);
batchSize = Integer.parseInt(props.get(BATCH_SIZE));
int queueSize = Integer.parseInt(props.get(QUEUE_SIZE));
eventBuffer = new LinkedBlockingQueue<>(queueSize);
sourcePartitions = createSourcePartitionsMap(geodeConnectorConfig.getRegionNames());
topics = geodeConnectorConfig.getTopics();
String cqPrefix = props.get(CQ_PREFIX);
installOnGeode(geodeConnectorConfig, geodeContext, eventBuffer, cqPrefix);
}
catch (Exception e) {
System.out.println("Exception:" + e);
e.printStackTrace();
throw e;
}
}
@Override
public List<SourceRecord> poll() throws InterruptedException {
ArrayList<SourceRecord> records = new ArrayList<>(batchSize);
ArrayList<GeodeEvent> events = new ArrayList<>(batchSize);
if (eventBuffer.drainTo(events, batchSize) > 0) {
for (GeodeEvent event : events) {
for (String topic : topics) {
records.add(new SourceRecord(sourcePartitions.get(event.getRegionName()), OFFSET_DEFAULT, topic, null, event.getEvent()));
}
}
return records;
}
return null;
}
@Override
public void stop() {
geodeContext.getClientCache().close(true);
}
void installOnGeode(GeodeConnectorConfig geodeConnectorConfig, GeodeContext geodeContext, BlockingQueue eventBuffer, String cqPrefix) {
boolean isDurable = geodeConnectorConfig.isDurable();
for (String region : geodeConnectorConfig.getRegionNames()) {
installListenersToRegion(geodeContext, taskId, eventBuffer, region, cqPrefix, isDurable);
}
if (isDurable) {
geodeContext.getClientCache().readyForEvents();
}
}
void installListenersToRegion(GeodeContext geodeContext, int taskId, BlockingQueue<GeodeEvent> eventBuffer, String regionName, String cqPrefix, boolean isDurable) {
CqAttributesFactory cqAttributesFactory = new CqAttributesFactory();
cqAttributesFactory.addCqListener(new GeodeKafkaSourceListener(eventBuffer, regionName));
CqAttributes cqAttributes = cqAttributesFactory.create();
geodeContext.newCq(generateCqName(taskId, cqPrefix, regionName), "select * from /" + regionName, cqAttributes,
isDurable);
}
/**
* converts a list of regions names into a map of source partitions
*
* @param regionNames list of regionNames
* @return Map<String, Map < String, String>> a map of source partitions, keyed by region name
*/
Map<String, Map<String, String>> createSourcePartitionsMap(List<String> regionNames) {
return regionNames.stream().map(regionName -> {
Map<String, String> sourcePartition = new HashMap<>();
sourcePartition.put(REGION_NAME, regionName);
return sourcePartition;
}).collect(Collectors.toMap(s -> s.get(REGION_NAME), s -> s));
}
String generateCqName(int taskId, String cqPrefix, String regionName) {
return cqPrefix + DOT + TASK_PREFIX + taskId + DOT + regionName;
}
}