blob: 896b602f589c456d62a06ce05ebcdb2d90c8b974 [file] [log] [blame]
package kafka;
import org.apache.geode.cache.client.ClientCache;
import org.apache.geode.cache.client.ClientCacheFactory;
import org.apache.geode.cache.query.CqAttributes;
import org.apache.geode.cache.query.CqAttributesFactory;
import org.apache.geode.cache.query.CqException;
import org.apache.geode.cache.query.CqExistsException;
import org.apache.geode.cache.query.RegionNotFoundException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;
import static kafka.GeodeConnectorConfig.BATCH_SIZE;
import static kafka.GeodeConnectorConfig.CQ_PREFIX;
import static kafka.GeodeConnectorConfig.DEFAULT_CQ_PREFIX;
import static kafka.GeodeConnectorConfig.DEFAULT_DURABLE_CLIENT_ID;
import static kafka.GeodeConnectorConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT;
import static kafka.GeodeConnectorConfig.DURABLE_CLIENT_ID_PREFIX;
import static kafka.GeodeConnectorConfig.DURABLE_CLIENT_TIME_OUT;
import static kafka.GeodeConnectorConfig.QUEUE_SIZE;
import static kafka.GeodeConnectorConfig.REGION_NAME;
public class GeodeKafkaSourceTask extends SourceTask {
//property string to pass in to identify task
public static final String TASK_ID = "GEODE_TASK_ID";
private static final String TASK_PREFIX = "TASK";
private static final String DOT = ".";
private static final Map<String, Long> OFFSET_DEFAULT = createOffset();
private int taskId;
private ClientCache clientCache;
private List<String> regionNames;
private List<String> topics;
private Map<String, Map<String, String>> sourcePartitions;
private static BlockingQueue<GeodeEvent> eventBuffer;
private int batchSize;
private static Map<String, Long> createOffset() {
Map<String, Long> offset = new HashMap<>();
offset.put("OFFSET", 0L);
return offset;
}
@Override
public String version() {
return null;
}
@Override
public void start(Map<String, String> props) {
try {
System.out.println("JASON task start");
taskId = Integer.parseInt(props.get(TASK_ID));
batchSize = Integer.parseInt(props.get(BATCH_SIZE));
int queueSize = Integer.parseInt(props.get(QUEUE_SIZE));
eventBuffer = new LinkedBlockingQueue<>(queueSize);
//grouping will be done in the source and not the task
regionNames = parseNames(props.get(GeodeConnectorConfig.REGIONS));
topics = parseNames(props.get(GeodeConnectorConfig.TOPICS));
sourcePartitions = createSourcePartitionsMap(regionNames);
String durableClientId = props.get(DURABLE_CLIENT_ID_PREFIX);
if (!durableClientId.equals("")) {
durableClientId += taskId;
}
System.out.println("JASON durable client id is:" + durableClientId);
String durableClientTimeout = props.get(DURABLE_CLIENT_TIME_OUT);
String cqPrefix = props.get(CQ_PREFIX);
List<LocatorHostPort> locators = parseLocators(props.get(GeodeConnectorConfig.LOCATORS));
installOnGeode(taskId, eventBuffer, locators, regionNames, durableClientId, durableClientTimeout, cqPrefix);
System.out.println("JASON task start finished");
}
catch (Exception e) {
System.out.println("Exception:" + e);
e.printStackTrace();
throw e;
}
}
@Override
public List<SourceRecord> poll() throws InterruptedException {
ArrayList<SourceRecord> records = new ArrayList<>(batchSize);
ArrayList<GeodeEvent> events = new ArrayList<>(batchSize);
if (eventBuffer.drainTo(events, batchSize) > 0) {
for (GeodeEvent event : events) {
for (String topic : topics) {
records.add(new SourceRecord(sourcePartitions.get(event.getRegionName()), OFFSET_DEFAULT, topic, null, event.getEvent()));
// records.add(new SourceRecord(sourcePartitions.get(event.getRegionName()), OFFSET_DEFAULT, topic, null, "STRING"));
}
}
return records;
}
return null;
}
@Override
public void stop() {
clientCache.close(true);
}
ClientCache createClientCache(List<LocatorHostPort> locators, String durableClientName, String durableClientTimeOut) {
ClientCacheFactory ccf = new ClientCacheFactory().set("durable-client-id", durableClientName)
.set("durable-client-timeout", durableClientTimeOut)
.setPoolSubscriptionEnabled(true);
for (LocatorHostPort locator: locators) {
ccf.addPoolLocator(locator.getHostName(), locator.getPort()).create();
}
return ccf.create();
}
void installOnGeode(int taskId, BlockingQueue<GeodeEvent> eventBuffer, List<LocatorHostPort> locators, List<String> regionNames, String durableClientId, String durableClientTimeout, String cqPrefix) {
boolean isDurable = isDurable(durableClientId);
clientCache = createClientCache(locators, durableClientId, durableClientTimeout);
for (String region : regionNames) {
installListenersToRegion(taskId, eventBuffer, region, cqPrefix, isDurable);
}
if (isDurable) {
clientCache.readyForEvents();
}
}
void installListenersToRegion(int taskId, BlockingQueue<GeodeEvent> eventBuffer, String regionName, String cqPrefix, boolean isDurable) {
CqAttributesFactory cqAttributesFactory = new CqAttributesFactory();
cqAttributesFactory.addCqListener(new GeodeKafkaSourceListener(eventBuffer, regionName));
CqAttributes cqAttributes = cqAttributesFactory.create();
try {
clientCache.getQueryService().newCq(generateCqName(taskId, cqPrefix, regionName), "select * from /" + regionName, cqAttributes,
isDurable).execute();
} catch (CqExistsException e) {
System.out.println("UHH");
e.printStackTrace();
} catch (CqException | RegionNotFoundException e) {
System.out.println("UHH e");
e.printStackTrace();
} catch (Exception e) {
System.out.println("UHHHHHH " + e);
}
}
List<String> parseNames(String names) {
return Arrays.stream(names.split(",")).map((s) -> s.trim()).collect(Collectors.toList());
}
List<LocatorHostPort> parseLocators(String locators) {
return Arrays.stream(locators.split(",")).map((s) -> {
String locatorString = s.trim();
return parseLocator(locatorString);
}).collect(Collectors.toList());
}
private LocatorHostPort parseLocator(String locatorString) {
String[] splits = locatorString.split("\\[");
String locator = splits[0];
int port = Integer.parseInt(splits[1].replace("]", ""));
return new LocatorHostPort(locator, port);
}
boolean isDurable(String durableClientId) {
return !durableClientId.equals("");
}
String generateCqName(int taskId, String cqPrefix, String regionName) {
return cqPrefix + DOT + TASK_PREFIX + taskId + DOT + regionName;
}
/**
* converts a list of regions names into a map of source partitions
*
* @param regionNames list of regionNames
* @return Map<String, Map < String, String>> a map of source partitions, keyed by region name
*/
Map<String, Map<String, String>> createSourcePartitionsMap(List<String> regionNames) {
return regionNames.stream().map(regionName -> {
Map<String, String> sourcePartition = new HashMap<>();
sourcePartition.put(REGION_NAME, regionName);
return sourcePartition;
}).collect(Collectors.toMap(s -> s.get(REGION_NAME), s -> s));
}
}