Added GeodeContext to handle cache related operations
Added GeodeConnectorConfig for shared configs
Moved packages
diff --git a/src/main/java/geode/kafka/GeodeConnectorConfig.java b/src/main/java/geode/kafka/GeodeConnectorConfig.java
new file mode 100644
index 0000000..9ac561f
--- /dev/null
+++ b/src/main/java/geode/kafka/GeodeConnectorConfig.java
@@ -0,0 +1,132 @@
+package geode.kafka;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class GeodeConnectorConfig {
+
+ //Geode Configuration
+ public static final String DURABLE_CLIENT_ID_PREFIX = "durableClientId";
+ public static final String DEFAULT_DURABLE_CLIENT_ID = "";
+ public static final String DURABLE_CLIENT_TIME_OUT = "durableClientTimeout";
+ public static final String DEFAULT_DURABLE_CLIENT_TIMEOUT = "60000";
+
+
+ //GeodeKafka Specific Configuration
+ public static final String TASK_ID = "GEODE_TASK_ID"; //One config per task
+
+ public static final String CQ_PREFIX = "cqPrefix";
+ public static final String DEFAULT_CQ_PREFIX = "cqForGeodeKafka";
+ /**
+ * Specifies which Locators to connect to Apache Geode
+ */
+ public static final String LOCATORS = "locators";
+ public static final String DEFAULT_LOCATOR = "localhost[10334]";
+
+ /**
+ * Specifies which Regions to connect in Apache Geode
+ */
+ public static final String REGIONS = "regions";
+
+ /**
+ * Specifies which Topics to connect in Kafka
+ */
+ public static final String TOPICS = "topics";
+
+ /**
+ * Property to describe the Source Partition in a record
+ */
+ public static final String REGION_NAME = "regionName"; //used for Source Partition Events
+
+ public static final String BATCH_SIZE = "geodeConnectorBatchSize";
+ public static final String DEFAULT_BATCH_SIZE = "100";
+
+ public static final String QUEUE_SIZE = "geodeConnectorQueueSize";
+ public static final String DEFAULT_QUEUE_SIZE = "100000";
+
+
+
+ private final int taskId;
+ private final String durableClientId;
+ private final String durableClientIdPrefix;
+ private final String durableClientTimeout;
+ private List<String> regionNames;
+ private List<String> topics;
+ private List<LocatorHostPort> locatorHostPorts;
+
+ //just for tests
+ GeodeConnectorConfig() {
+ taskId = 0;
+ durableClientId = "";
+ durableClientIdPrefix = "";
+ durableClientTimeout = "0";
+ }
+
+ public GeodeConnectorConfig(Map<String, String> connectorProperties) {
+ taskId = Integer.parseInt(connectorProperties.get(TASK_ID));
+ durableClientIdPrefix = connectorProperties.get(DURABLE_CLIENT_ID_PREFIX);
+ if (isDurable(durableClientIdPrefix)) {
+ durableClientId = durableClientIdPrefix + taskId;
+ }
+ else {
+ durableClientId = "";
+ }
+ durableClientTimeout = connectorProperties.get(DURABLE_CLIENT_TIME_OUT);
+ regionNames = parseNames(connectorProperties.get(GeodeConnectorConfig.REGIONS));
+ topics = parseNames(connectorProperties.get(GeodeConnectorConfig.TOPICS));
+ locatorHostPorts = parseLocators(connectorProperties.get(GeodeConnectorConfig.LOCATORS));
+ }
+
+ List<String> parseNames(String names) {
+ return Arrays.stream(names.split(",")).map((s) -> s.trim()).collect(Collectors.toList());
+ }
+
+ List<LocatorHostPort> parseLocators(String locators) {
+ return Arrays.stream(locators.split(",")).map((s) -> {
+ String locatorString = s.trim();
+ return parseLocator(locatorString);
+ }).collect(Collectors.toList());
+ }
+
+ private LocatorHostPort parseLocator(String locatorString) {
+ String[] splits = locatorString.split("\\[");
+ String locator = splits[0];
+ int port = Integer.parseInt(splits[1].replace("]", ""));
+ return new LocatorHostPort(locator, port);
+ }
+
+ public boolean isDurable() {
+ return isDurable(durableClientId);
+ }
+
+ /**
+ * @param durableClientId or prefix can be passed in. Either both will be "" or both will have a value
+ * @return
+ */
+ boolean isDurable(String durableClientId) {
+ return !durableClientId.equals("");
+ }
+
+
+ public String getDurableClientId() {
+ return durableClientId;
+ }
+
+ public String getDurableClientTimeout() {
+ return durableClientTimeout;
+ }
+
+ public List<String> getRegionNames() {
+ return regionNames;
+ }
+
+ public List<String> getTopics() {
+ return topics;
+ }
+
+ public List<LocatorHostPort> getLocatorHostPorts() {
+ return locatorHostPorts;
+ }
+}
diff --git a/src/main/java/kafka/LocatorHostPort.java b/src/main/java/geode/kafka/LocatorHostPort.java
similarity index 94%
rename from src/main/java/kafka/LocatorHostPort.java
rename to src/main/java/geode/kafka/LocatorHostPort.java
index 517bad7..50d7440 100644
--- a/src/main/java/kafka/LocatorHostPort.java
+++ b/src/main/java/geode/kafka/LocatorHostPort.java
@@ -1,4 +1,4 @@
-package kafka;
+package geode.kafka;
public class LocatorHostPort {
diff --git a/src/main/java/kafka/GeodeKafkaSink.java b/src/main/java/geode/kafka/sink/GeodeKafkaSink.java
similarity index 98%
rename from src/main/java/kafka/GeodeKafkaSink.java
rename to src/main/java/geode/kafka/sink/GeodeKafkaSink.java
index 67a244e..68460e4 100644
--- a/src/main/java/kafka/GeodeKafkaSink.java
+++ b/src/main/java/geode/kafka/sink/GeodeKafkaSink.java
@@ -1,4 +1,4 @@
-package kafka;
+package geode.kafka.sink;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.AppInfoParser;
diff --git a/src/main/java/kafka/GeodeEvent.java b/src/main/java/geode/kafka/source/GeodeEvent.java
similarity index 94%
rename from src/main/java/kafka/GeodeEvent.java
rename to src/main/java/geode/kafka/source/GeodeEvent.java
index 805f4a0..41e37c6 100644
--- a/src/main/java/kafka/GeodeEvent.java
+++ b/src/main/java/geode/kafka/source/GeodeEvent.java
@@ -1,4 +1,4 @@
-package kafka;
+package geode.kafka.source;
import org.apache.geode.cache.query.CqEvent;
diff --git a/src/main/java/kafka/GeodeKafkaSource.java b/src/main/java/geode/kafka/source/GeodeKafkaSource.java
similarity index 64%
rename from src/main/java/kafka/GeodeKafkaSource.java
rename to src/main/java/geode/kafka/source/GeodeKafkaSource.java
index 95afc50..d5da62a 100644
--- a/src/main/java/kafka/GeodeKafkaSource.java
+++ b/src/main/java/geode/kafka/source/GeodeKafkaSource.java
@@ -1,5 +1,6 @@
-package kafka;
+package geode.kafka.source;
+import geode.kafka.GeodeConnectorConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.connector.Task;
@@ -10,21 +11,19 @@
import java.util.List;
import java.util.Map;
-import static kafka.GeodeConnectorConfig.BATCH_SIZE;
-import static kafka.GeodeConnectorConfig.CQ_PREFIX;
-import static kafka.GeodeConnectorConfig.DEFAULT_BATCH_SIZE;
-import static kafka.GeodeConnectorConfig.DEFAULT_CQ_PREFIX;
-import static kafka.GeodeConnectorConfig.DEFAULT_DURABLE_CLIENT_ID;
-import static kafka.GeodeConnectorConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT;
-import static kafka.GeodeConnectorConfig.DEFAULT_LOCATOR;
-import static kafka.GeodeConnectorConfig.DEFAULT_QUEUE_SIZE;
-import static kafka.GeodeConnectorConfig.DURABLE_CLIENT_ID_PREFIX;
-import static kafka.GeodeConnectorConfig.DURABLE_CLIENT_TIME_OUT;
-import static kafka.GeodeConnectorConfig.LOCATORS;
-import static kafka.GeodeConnectorConfig.QUEUE_SIZE;
-import static kafka.GeodeConnectorConfig.REGIONS;
-import static kafka.GeodeConnectorConfig.TOPICS;
-import static kafka.GeodeKafkaSourceTask.TASK_ID;
+import static geode.kafka.GeodeConnectorConfig.BATCH_SIZE;
+import static geode.kafka.GeodeConnectorConfig.CQ_PREFIX;
+import static geode.kafka.GeodeConnectorConfig.DEFAULT_BATCH_SIZE;
+import static geode.kafka.GeodeConnectorConfig.DEFAULT_CQ_PREFIX;
+import static geode.kafka.GeodeConnectorConfig.DEFAULT_DURABLE_CLIENT_ID;
+import static geode.kafka.GeodeConnectorConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT;
+import static geode.kafka.GeodeConnectorConfig.DEFAULT_LOCATOR;
+import static geode.kafka.GeodeConnectorConfig.DEFAULT_QUEUE_SIZE;
+import static geode.kafka.GeodeConnectorConfig.DURABLE_CLIENT_ID_PREFIX;
+import static geode.kafka.GeodeConnectorConfig.DURABLE_CLIENT_TIME_OUT;
+import static geode.kafka.GeodeConnectorConfig.LOCATORS;
+import static geode.kafka.GeodeConnectorConfig.QUEUE_SIZE;
+
public class GeodeKafkaSource extends SourceConnector {
@@ -39,16 +38,14 @@
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
- System.out.println("GKSource: taskConfigs");
List<Map<String, String>> taskConfigs = new ArrayList<>();
Map<String, String> taskProps = new HashMap<>();
taskProps.putAll(sharedProps);
- // use the same props for all tasks at the moment
for (int i = 0; i < maxTasks; i++) {
//TODO partition regions and topics
- taskProps.put(TASK_ID, "" + i);
+ taskProps.put(GeodeConnectorConfig.TASK_ID, "" + i);
taskConfigs.add(taskProps);
}
return taskConfigs;
@@ -82,6 +79,7 @@
@Override
public String version() {
+ //TODO
return AppInfoParser.getVersion();
}
diff --git a/src/main/java/kafka/GeodeKafkaSourceListener.java b/src/main/java/geode/kafka/source/GeodeKafkaSourceListener.java
similarity index 95%
rename from src/main/java/kafka/GeodeKafkaSourceListener.java
rename to src/main/java/geode/kafka/source/GeodeKafkaSourceListener.java
index 4c0e729..c4d6b22 100644
--- a/src/main/java/kafka/GeodeKafkaSourceListener.java
+++ b/src/main/java/geode/kafka/source/GeodeKafkaSourceListener.java
@@ -1,7 +1,6 @@
-package kafka;
+package geode.kafka.source;
import org.apache.geode.cache.query.CqEvent;
-import org.apache.geode.cache.query.CqListener;
import org.apache.geode.cache.query.CqStatusListener;
import java.util.concurrent.BlockingQueue;
diff --git a/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java b/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java
new file mode 100644
index 0000000..23fa141
--- /dev/null
+++ b/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java
@@ -0,0 +1,133 @@
+package geode.kafka.source;
+
+import geode.kafka.GeodeContext;
+import geode.kafka.GeodeConnectorConfig;
+import org.apache.geode.cache.query.CqAttributes;
+import org.apache.geode.cache.query.CqAttributesFactory;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Collectors;
+
+import static geode.kafka.GeodeConnectorConfig.BATCH_SIZE;
+import static geode.kafka.GeodeConnectorConfig.CQ_PREFIX;
+import static geode.kafka.GeodeConnectorConfig.QUEUE_SIZE;
+import static geode.kafka.GeodeConnectorConfig.REGION_NAME;
+
+public class GeodeKafkaSourceTask extends SourceTask {
+
+ private static final String TASK_PREFIX = "TASK";
+ private static final String DOT = ".";
+
+ //property string to pass in to identify task
+ private static final Map<String, Long> OFFSET_DEFAULT = createOffset();
+
+ private int taskId;
+ private GeodeContext geodeContext;
+ private List<String> topics;
+
+ private Map<String, Map<String, String>> sourcePartitions;
+ private static BlockingQueue<GeodeEvent> eventBuffer;
+ private int batchSize;
+
+
+ private static Map<String, Long> createOffset() {
+ Map<String, Long> offset = new HashMap<>();
+ offset.put("OFFSET", 0L);
+ return offset;
+ }
+
+ @Override
+ public String version() {
+ return null;
+ }
+
+ @Override
+ public void start(Map<String, String> props) {
+ try {
+ GeodeConnectorConfig geodeConnectorConfig = new GeodeConnectorConfig(props);
+ geodeContext = new GeodeContext(geodeConnectorConfig);
+
+ batchSize = Integer.parseInt(props.get(BATCH_SIZE));
+ int queueSize = Integer.parseInt(props.get(QUEUE_SIZE));
+ eventBuffer = new LinkedBlockingQueue<>(queueSize);
+
+ sourcePartitions = createSourcePartitionsMap(geodeConnectorConfig.getRegionNames());
+ topics = geodeConnectorConfig.getTopics();
+
+ String cqPrefix = props.get(CQ_PREFIX);
+
+ installOnGeode(geodeConnectorConfig, geodeContext, eventBuffer, cqPrefix);
+ }
+ catch (Exception e) {
+ System.out.println("Exception:" + e);
+ e.printStackTrace();
+ throw e;
+ }
+ }
+
+ @Override
+ public List<SourceRecord> poll() throws InterruptedException {
+ ArrayList<SourceRecord> records = new ArrayList<>(batchSize);
+ ArrayList<GeodeEvent> events = new ArrayList<>(batchSize);
+ if (eventBuffer.drainTo(events, batchSize) > 0) {
+ for (GeodeEvent event : events) {
+ for (String topic : topics) {
+ records.add(new SourceRecord(sourcePartitions.get(event.getRegionName()), OFFSET_DEFAULT, topic, null, event.getEvent()));
+ }
+ }
+
+ return records;
+ }
+
+ return null;
+ }
+
+ @Override
+ public void stop() {
+ geodeContext.getClientCache().close(true);
+ }
+
+ void installOnGeode(GeodeConnectorConfig geodeConnectorConfig, GeodeContext geodeContext, BlockingQueue eventBuffer, String cqPrefix) {
+ boolean isDurable = geodeConnectorConfig.isDurable();
+ for (String region : geodeConnectorConfig.getRegionNames()) {
+ installListenersToRegion(geodeContext, taskId, eventBuffer, region, cqPrefix, isDurable);
+ }
+ if (isDurable) {
+ geodeContext.getClientCache().readyForEvents();
+ }
+ }
+
+ void installListenersToRegion(GeodeContext geodeContext, int taskId, BlockingQueue<GeodeEvent> eventBuffer, String regionName, String cqPrefix, boolean isDurable) {
+ CqAttributesFactory cqAttributesFactory = new CqAttributesFactory();
+ cqAttributesFactory.addCqListener(new GeodeKafkaSourceListener(eventBuffer, regionName));
+ CqAttributes cqAttributes = cqAttributesFactory.create();
+ geodeContext.newCq(generateCqName(taskId, cqPrefix, regionName), "select * from /" + regionName, cqAttributes,
+ isDurable);
+ }
+
+ /**
+ * converts a list of regions names into a map of source partitions
+ *
+ * @param regionNames list of regionNames
+ * @return Map<String, Map < String, String>> a map of source partitions, keyed by region name
+ */
+ Map<String, Map<String, String>> createSourcePartitionsMap(List<String> regionNames) {
+ return regionNames.stream().map(regionName -> {
+ Map<String, String> sourcePartition = new HashMap<>();
+ sourcePartition.put(REGION_NAME, regionName);
+ return sourcePartition;
+ }).collect(Collectors.toMap(s -> s.get(REGION_NAME), s -> s));
+ }
+
+ String generateCqName(int taskId, String cqPrefix, String regionName) {
+ return cqPrefix + DOT + TASK_PREFIX + taskId + DOT + regionName;
+ }
+
+}
diff --git a/src/main/java/kafka/GeodeConnectorConfig.java b/src/main/java/kafka/GeodeConnectorConfig.java
deleted file mode 100644
index 003367d..0000000
--- a/src/main/java/kafka/GeodeConnectorConfig.java
+++ /dev/null
@@ -1,41 +0,0 @@
-package kafka;
-
-public class GeodeConnectorConfig {
-
- //Geode Configuration
- public static final String DURABLE_CLIENT_ID_PREFIX = "durableClientId";
- public static final String DEFAULT_DURABLE_CLIENT_ID = "";
- public static final String DURABLE_CLIENT_TIME_OUT = "durableClientTimeout";
- public static final String DEFAULT_DURABLE_CLIENT_TIMEOUT = "60000";
-
-
- //GeodeKafka Specific Configuration
- public static final String CQ_PREFIX = "cqPrefix";
- public static final String DEFAULT_CQ_PREFIX = "cqForGeodeKafka";
- /**
- * Specifies which Locators to connect to Apache Geode
- */
- public static final String LOCATORS = "locators";
- public static final String DEFAULT_LOCATOR = "localhost[10334]";
-
- /**
- * Specifies which Regions to connect in Apache Geode
- */
- public static final String REGIONS = "regions";
-
- /**
- * Specifies which Topics to connect in Kafka
- */
- public static final String TOPICS = "topics";
-
- /**
- * Property to describe the Source Partition in a record
- */
- public static final String REGION_NAME = "regionName"; //used for Source Partition Events
-
- public static final String BATCH_SIZE = "geodeConnectorBatchSize";
- public static final String DEFAULT_BATCH_SIZE = "100";
-
- public static final String QUEUE_SIZE = "geodeConnectorQueueSize";
- public static final String DEFAULT_QUEUE_SIZE = "100000";
-}
diff --git a/src/main/java/kafka/GeodeKafkaSourceTask.java b/src/main/java/kafka/GeodeKafkaSourceTask.java
deleted file mode 100644
index 896b602..0000000
--- a/src/main/java/kafka/GeodeKafkaSourceTask.java
+++ /dev/null
@@ -1,196 +0,0 @@
-package kafka;
-
-import org.apache.geode.cache.client.ClientCache;
-import org.apache.geode.cache.client.ClientCacheFactory;
-import org.apache.geode.cache.query.CqAttributes;
-import org.apache.geode.cache.query.CqAttributesFactory;
-import org.apache.geode.cache.query.CqException;
-import org.apache.geode.cache.query.CqExistsException;
-import org.apache.geode.cache.query.RegionNotFoundException;
-import org.apache.kafka.connect.source.SourceRecord;
-import org.apache.kafka.connect.source.SourceTask;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.stream.Collectors;
-
-import static kafka.GeodeConnectorConfig.BATCH_SIZE;
-import static kafka.GeodeConnectorConfig.CQ_PREFIX;
-import static kafka.GeodeConnectorConfig.DEFAULT_CQ_PREFIX;
-import static kafka.GeodeConnectorConfig.DEFAULT_DURABLE_CLIENT_ID;
-import static kafka.GeodeConnectorConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT;
-import static kafka.GeodeConnectorConfig.DURABLE_CLIENT_ID_PREFIX;
-import static kafka.GeodeConnectorConfig.DURABLE_CLIENT_TIME_OUT;
-import static kafka.GeodeConnectorConfig.QUEUE_SIZE;
-import static kafka.GeodeConnectorConfig.REGION_NAME;
-
-public class GeodeKafkaSourceTask extends SourceTask {
-
- //property string to pass in to identify task
- public static final String TASK_ID = "GEODE_TASK_ID";
- private static final String TASK_PREFIX = "TASK";
- private static final String DOT = ".";
- private static final Map<String, Long> OFFSET_DEFAULT = createOffset();
-
- private int taskId;
- private ClientCache clientCache;
- private List<String> regionNames;
- private List<String> topics;
- private Map<String, Map<String, String>> sourcePartitions;
- private static BlockingQueue<GeodeEvent> eventBuffer;
- private int batchSize;
-
-
- private static Map<String, Long> createOffset() {
- Map<String, Long> offset = new HashMap<>();
- offset.put("OFFSET", 0L);
- return offset;
- }
-
- @Override
- public String version() {
- return null;
- }
-
- @Override
- public void start(Map<String, String> props) {
- try {
- System.out.println("JASON task start");
- taskId = Integer.parseInt(props.get(TASK_ID));
- batchSize = Integer.parseInt(props.get(BATCH_SIZE));
- int queueSize = Integer.parseInt(props.get(QUEUE_SIZE));
- eventBuffer = new LinkedBlockingQueue<>(queueSize);
-
- //grouping will be done in the source and not the task
- regionNames = parseNames(props.get(GeodeConnectorConfig.REGIONS));
- topics = parseNames(props.get(GeodeConnectorConfig.TOPICS));
- sourcePartitions = createSourcePartitionsMap(regionNames);
-
- String durableClientId = props.get(DURABLE_CLIENT_ID_PREFIX);
- if (!durableClientId.equals("")) {
- durableClientId += taskId;
- }
- System.out.println("JASON durable client id is:" + durableClientId);
- String durableClientTimeout = props.get(DURABLE_CLIENT_TIME_OUT);
- String cqPrefix = props.get(CQ_PREFIX);
-
- List<LocatorHostPort> locators = parseLocators(props.get(GeodeConnectorConfig.LOCATORS));
- installOnGeode(taskId, eventBuffer, locators, regionNames, durableClientId, durableClientTimeout, cqPrefix);
- System.out.println("JASON task start finished");
- }
- catch (Exception e) {
- System.out.println("Exception:" + e);
- e.printStackTrace();
- throw e;
- }
- }
-
- @Override
- public List<SourceRecord> poll() throws InterruptedException {
- ArrayList<SourceRecord> records = new ArrayList<>(batchSize);
- ArrayList<GeodeEvent> events = new ArrayList<>(batchSize);
- if (eventBuffer.drainTo(events, batchSize) > 0) {
- for (GeodeEvent event : events) {
- for (String topic : topics) {
- records.add(new SourceRecord(sourcePartitions.get(event.getRegionName()), OFFSET_DEFAULT, topic, null, event.getEvent()));
-// records.add(new SourceRecord(sourcePartitions.get(event.getRegionName()), OFFSET_DEFAULT, topic, null, "STRING"));
- }
- }
-
- return records;
- }
-
- return null;
- }
-
- @Override
- public void stop() {
- clientCache.close(true);
- }
-
- ClientCache createClientCache(List<LocatorHostPort> locators, String durableClientName, String durableClientTimeOut) {
- ClientCacheFactory ccf = new ClientCacheFactory().set("durable-client-id", durableClientName)
- .set("durable-client-timeout", durableClientTimeOut)
- .setPoolSubscriptionEnabled(true);
- for (LocatorHostPort locator: locators) {
- ccf.addPoolLocator(locator.getHostName(), locator.getPort()).create();
- }
- return ccf.create();
- }
-
- void installOnGeode(int taskId, BlockingQueue<GeodeEvent> eventBuffer, List<LocatorHostPort> locators, List<String> regionNames, String durableClientId, String durableClientTimeout, String cqPrefix) {
- boolean isDurable = isDurable(durableClientId);
-
- clientCache = createClientCache(locators, durableClientId, durableClientTimeout);
- for (String region : regionNames) {
- installListenersToRegion(taskId, eventBuffer, region, cqPrefix, isDurable);
- }
- if (isDurable) {
- clientCache.readyForEvents();
- }
- }
-
- void installListenersToRegion(int taskId, BlockingQueue<GeodeEvent> eventBuffer, String regionName, String cqPrefix, boolean isDurable) {
- CqAttributesFactory cqAttributesFactory = new CqAttributesFactory();
- cqAttributesFactory.addCqListener(new GeodeKafkaSourceListener(eventBuffer, regionName));
- CqAttributes cqAttributes = cqAttributesFactory.create();
- try {
- clientCache.getQueryService().newCq(generateCqName(taskId, cqPrefix, regionName), "select * from /" + regionName, cqAttributes,
- isDurable).execute();
- } catch (CqExistsException e) {
- System.out.println("UHH");
- e.printStackTrace();
- } catch (CqException | RegionNotFoundException e) {
- System.out.println("UHH e");
- e.printStackTrace();
- } catch (Exception e) {
- System.out.println("UHHHHHH " + e);
- }
- }
-
-
- List<String> parseNames(String names) {
- return Arrays.stream(names.split(",")).map((s) -> s.trim()).collect(Collectors.toList());
- }
-
- List<LocatorHostPort> parseLocators(String locators) {
- return Arrays.stream(locators.split(",")).map((s) -> {
- String locatorString = s.trim();
- return parseLocator(locatorString);
- }).collect(Collectors.toList());
- }
-
- private LocatorHostPort parseLocator(String locatorString) {
- String[] splits = locatorString.split("\\[");
- String locator = splits[0];
- int port = Integer.parseInt(splits[1].replace("]", ""));
- return new LocatorHostPort(locator, port);
- }
-
- boolean isDurable(String durableClientId) {
- return !durableClientId.equals("");
- }
-
- String generateCqName(int taskId, String cqPrefix, String regionName) {
- return cqPrefix + DOT + TASK_PREFIX + taskId + DOT + regionName;
- }
-
- /**
- * converts a list of regions names into a map of source partitions
- *
- * @param regionNames list of regionNames
- * @return Map<String, Map < String, String>> a map of source partitions, keyed by region name
- */
- Map<String, Map<String, String>> createSourcePartitionsMap(List<String> regionNames) {
- return regionNames.stream().map(regionName -> {
- Map<String, String> sourcePartition = new HashMap<>();
- sourcePartition.put(REGION_NAME, regionName);
- return sourcePartition;
- }).collect(Collectors.toMap(s -> s.get(REGION_NAME), s -> s));
- }
-}
diff --git a/src/test/java/kafka/GeodeKafkaTestCluster.java b/src/test/java/geode/kafka/GeodeKafkaTestCluster.java
similarity index 98%
rename from src/test/java/kafka/GeodeKafkaTestCluster.java
rename to src/test/java/geode/kafka/GeodeKafkaTestCluster.java
index 85c0fc4..f1a6dff 100644
--- a/src/test/java/kafka/GeodeKafkaTestCluster.java
+++ b/src/test/java/geode/kafka/GeodeKafkaTestCluster.java
@@ -1,6 +1,7 @@
-package kafka;
+package geode.kafka;
import kafka.admin.RackAwareMode;
+import geode.kafka.source.GeodeKafkaSource;
import kafka.zk.AdminZkClient;
import kafka.zk.KafkaZkClient;
import org.apache.geode.cache.Region;
diff --git a/src/test/java/kafka/GeodeLocalCluster.java b/src/test/java/geode/kafka/GeodeLocalCluster.java
similarity index 97%
rename from src/test/java/kafka/GeodeLocalCluster.java
rename to src/test/java/geode/kafka/GeodeLocalCluster.java
index 43ac8f5..afd3b9d 100644
--- a/src/test/java/kafka/GeodeLocalCluster.java
+++ b/src/test/java/geode/kafka/GeodeLocalCluster.java
@@ -1,4 +1,4 @@
-package kafka;
+package geode.kafka;
import java.io.IOException;
diff --git a/src/test/java/kafka/JavaProcess.java b/src/test/java/geode/kafka/JavaProcess.java
similarity index 97%
rename from src/test/java/kafka/JavaProcess.java
rename to src/test/java/geode/kafka/JavaProcess.java
index 30edfef..fe00094 100644
--- a/src/test/java/kafka/JavaProcess.java
+++ b/src/test/java/geode/kafka/JavaProcess.java
@@ -1,4 +1,4 @@
-package kafka;
+package geode.kafka;
import java.io.File;
import java.io.IOException;
diff --git a/src/test/java/kafka/KafkaLocalCluster.java b/src/test/java/geode/kafka/KafkaLocalCluster.java
similarity index 96%
rename from src/test/java/kafka/KafkaLocalCluster.java
rename to src/test/java/geode/kafka/KafkaLocalCluster.java
index cd2a3df..57f16f4 100644
--- a/src/test/java/kafka/KafkaLocalCluster.java
+++ b/src/test/java/geode/kafka/KafkaLocalCluster.java
@@ -1,4 +1,4 @@
-package kafka;
+package geode.kafka;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServerStartable;
diff --git a/src/test/java/kafka/LocatorLauncherWrapper.java b/src/test/java/geode/kafka/LocatorLauncherWrapper.java
similarity index 98%
rename from src/test/java/kafka/LocatorLauncherWrapper.java
rename to src/test/java/geode/kafka/LocatorLauncherWrapper.java
index c1a7075..57ff405 100644
--- a/src/test/java/kafka/LocatorLauncherWrapper.java
+++ b/src/test/java/geode/kafka/LocatorLauncherWrapper.java
@@ -1,4 +1,4 @@
-package kafka;
+package geode.kafka;
import org.apache.geode.distributed.ConfigurationProperties;
import org.apache.geode.distributed.Locator;
diff --git a/src/test/java/kafka/ServerLauncherWrapper.java b/src/test/java/geode/kafka/ServerLauncherWrapper.java
similarity index 98%
rename from src/test/java/kafka/ServerLauncherWrapper.java
rename to src/test/java/geode/kafka/ServerLauncherWrapper.java
index 933824e..b36a3aa 100644
--- a/src/test/java/kafka/ServerLauncherWrapper.java
+++ b/src/test/java/geode/kafka/ServerLauncherWrapper.java
@@ -1,4 +1,4 @@
-package kafka;
+package geode.kafka;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheFactory;
diff --git a/src/test/java/kafka/WorkerAndHerderCluster.java b/src/test/java/geode/kafka/WorkerAndHerderCluster.java
similarity index 95%
rename from src/test/java/kafka/WorkerAndHerderCluster.java
rename to src/test/java/geode/kafka/WorkerAndHerderCluster.java
index 7357232..c347946 100644
--- a/src/test/java/kafka/WorkerAndHerderCluster.java
+++ b/src/test/java/geode/kafka/WorkerAndHerderCluster.java
@@ -1,4 +1,4 @@
-package kafka;
+package geode.kafka;
import java.io.IOException;
diff --git a/src/test/java/kafka/WorkerAndHerderWrapper.java b/src/test/java/geode/kafka/WorkerAndHerderWrapper.java
similarity index 88%
rename from src/test/java/kafka/WorkerAndHerderWrapper.java
rename to src/test/java/geode/kafka/WorkerAndHerderWrapper.java
index 5f8ccd2..cc8e27b 100644
--- a/src/test/java/kafka/WorkerAndHerderWrapper.java
+++ b/src/test/java/geode/kafka/WorkerAndHerderWrapper.java
@@ -1,7 +1,6 @@
-package kafka;
+package geode.kafka;
-import org.apache.geode.distributed.ConfigurationProperties;
-import org.apache.geode.distributed.Locator;
+import geode.kafka.source.GeodeKafkaSource;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.runtime.ConnectorConfig;
@@ -14,16 +13,14 @@
import org.apache.kafka.connect.storage.MemoryOffsetBackingStore;
import org.apache.kafka.connect.util.ConnectUtils;
-import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
-import java.util.Properties;
-import static kafka.GeodeConnectorConfig.REGIONS;
-import static kafka.GeodeConnectorConfig.TOPICS;
-import static kafka.GeodeKafkaTestCluster.TEST_REGIONS;
-import static kafka.GeodeKafkaTestCluster.TEST_TOPICS;
+import static geode.kafka.GeodeConnectorConfig.REGIONS;
+import static geode.kafka.GeodeConnectorConfig.TOPICS;
+import static geode.kafka.GeodeKafkaTestCluster.TEST_REGIONS;
+import static geode.kafka.GeodeKafkaTestCluster.TEST_TOPICS;
public class WorkerAndHerderWrapper {
diff --git a/src/test/java/kafka/ZooKeeperLocalCluster.java b/src/test/java/geode/kafka/ZooKeeperLocalCluster.java
similarity index 98%
rename from src/test/java/kafka/ZooKeeperLocalCluster.java
rename to src/test/java/geode/kafka/ZooKeeperLocalCluster.java
index 8b23f53..a3d3433 100644
--- a/src/test/java/kafka/ZooKeeperLocalCluster.java
+++ b/src/test/java/geode/kafka/ZooKeeperLocalCluster.java
@@ -1,4 +1,4 @@
-package kafka;
+package geode.kafka;
import org.apache.zookeeper.server.ServerConfig;
import org.apache.zookeeper.server.ZooKeeperServerMain;
diff --git a/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java b/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java
new file mode 100644
index 0000000..33c260d
--- /dev/null
+++ b/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java
@@ -0,0 +1,85 @@
+package geode.kafka.source;
+
+import geode.kafka.GeodeConnectorConfig;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static geode.kafka.GeodeConnectorConfig.REGION_NAME;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+public class GeodeKafkaSourceTaskTest {
+
+ @Test
+ public void cqListenerOnEventPopulatesEventsBuffer() {
+
+ }
+
+ @Test
+ public void pollReturnsEventsWhenEventBufferHasValues() {
+
+ }
+
+ @Test
+ public void regionsArePassedCorrectlyToTask() {
+
+ }
+
+ @Test
+ public void installOnGeodeShouldCallCq() {
+ GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
+ }
+
+
+
+
+
+ @Test
+ public void createSourcePartitionsShouldReturnAMapOfSourcePartitions() {
+ GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
+ List<String> regionNames = Arrays.asList(new String[]{"region1", "region2", "region3"});
+ Map<String, Map<String,String>> sourcePartitions = task.createSourcePartitionsMap(regionNames);
+ assertThat(3, is(sourcePartitions.size()));
+ assertThat(true, is(sourcePartitions.get("region1").get(REGION_NAME).equals("region1")));
+ assertThat(true, is(sourcePartitions.get("region2").get(REGION_NAME).equals("region2")));
+ assertThat(true, is(sourcePartitions.get("region3").get(REGION_NAME).equals("region3")));
+ }
+
+
+
+ @Test
+ public void listOfLocatorsShouldBeConfiguredIntoClientCache() {
+
+ }
+
+ @Test
+ public void shouldNotBeDurableIfDurableClientIdIsNull() {
+
+ }
+
+ @Test
+ public void shouldNotCallReadyForEventsIfDurableClientPrefixIsEmpty() {
+
+ }
+
+ //Source properties tests
+ @Test
+ public void propertiesShouldBeCorrectlyTranslatedToConfiguration() {
+ Map<String, String> props = new HashMap<>();
+ props.put(GeodeConnectorConfig.QUEUE_SIZE, GeodeConnectorConfig.DEFAULT_QUEUE_SIZE);
+ props.put(GeodeConnectorConfig.BATCH_SIZE, GeodeConnectorConfig.DEFAULT_BATCH_SIZE);
+
+ GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
+ task.start(props);
+
+// assertThat(task.getQueueSize(GeodeConnectorConfig.QUEUE_SIZE));
+
+
+ }
+
+
+}
diff --git a/src/test/java/kafka/GeodeKafkaSourceTest.java b/src/test/java/geode/kafka/source/GeodeKafkaSourceTest.java
similarity index 93%
rename from src/test/java/kafka/GeodeKafkaSourceTest.java
rename to src/test/java/geode/kafka/source/GeodeKafkaSourceTest.java
index ec6dff8..717d495 100644
--- a/src/test/java/kafka/GeodeKafkaSourceTest.java
+++ b/src/test/java/geode/kafka/source/GeodeKafkaSourceTest.java
@@ -1,4 +1,4 @@
-package kafka;
+package geode.kafka.source;
import org.junit.Test;
diff --git a/src/test/java/kafka/GeodeKafkaSourceTaskTest.java b/src/test/java/kafka/GeodeKafkaSourceTaskTest.java
deleted file mode 100644
index 2c15664..0000000
--- a/src/test/java/kafka/GeodeKafkaSourceTaskTest.java
+++ /dev/null
@@ -1,114 +0,0 @@
-package kafka;
-
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static kafka.GeodeConnectorConfig.REGION_NAME;
-import static org.hamcrest.CoreMatchers.allOf;
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-
-public class GeodeKafkaSourceTaskTest {
-
- @Test
- public void cqListenerOnEventPopulatesEventsBuffer() {
-
- }
-
- @Test
- public void pollReturnsEventsWhenEventBufferHasValues() {
-
- }
-
- @Test
- public void regionsArePassedCorrectlyToTask() {
-
- }
-
- @Test
- public void installOnGeodeShouldCallCq() {
- GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
- }
-
-
-
- @Test
- public void parseRegionNamesShouldSplitOnComma() {
- GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
- List<String> regionNames = task.parseNames("region1,region2,region3,region4");
- assertEquals(4, regionNames.size());
- assertThat(true, allOf(is(regionNames.contains("region1"))
- , is(regionNames.contains("region2"))
- , is(regionNames.contains("region3"))
- , is(regionNames.contains("region4"))));
- }
-
- @Test
- public void parseRegionNamesShouldChomp() {
- GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
- List<String> regionNames = task.parseNames("region1, region2, region3,region4");
- assertEquals(4, regionNames.size());
- assertThat(true, allOf(is(regionNames instanceof List)
- , is(regionNames.contains("region1"))
- , is(regionNames.contains("region2"))
- , is(regionNames.contains("region3"))
- , is(regionNames.contains("region4"))));
- }
-
- @Test
- public void createSourcePartitionsShouldReturnAMapOfSourcePartitions() {
- GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
- List<String> regionNames = Arrays.asList(new String[]{"region1", "region2", "region3"});
- Map<String, Map<String,String>> sourcePartitions = task.createSourcePartitionsMap(regionNames);
- assertThat(3, is(sourcePartitions.size()));
- assertThat(true, is(sourcePartitions.get("region1").get(REGION_NAME).equals("region1")));
- assertThat(true, is(sourcePartitions.get("region2").get(REGION_NAME).equals("region2")));
- assertThat(true, is(sourcePartitions.get("region3").get(REGION_NAME).equals("region3")));
- }
-
- @Test
- public void shouldBeAbleToParseGeodeLocatorStrings() {
- GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
- String locatorString="localhost[8888], localhost[8881]";
- List<LocatorHostPort> locators = task.parseLocators(locatorString);
- assertThat(2, is(locators.size()));
- }
-
- @Test
- public void listOfLocatorsShouldBeConfiguredIntoClientCache() {
-
- }
-
- @Test
- public void shouldNotBeDurableIfDurableClientIdIsNull() {
-
- }
-
- @Test
- public void shouldNotCallReadyForEventsIfDurableClientPrefixIsEmpty() {
-
- }
-
- //Source properties tests
- @Test
- public void propertiesShouldBeCorrectlyTranslatedToConfiguration() {
- Map<String, String> props = new HashMap<>();
- props.put(GeodeConnectorConfig.QUEUE_SIZE, GeodeConnectorConfig.DEFAULT_QUEUE_SIZE);
- props.put(GeodeConnectorConfig.BATCH_SIZE, GeodeConnectorConfig.DEFAULT_BATCH_SIZE);
-
-
- GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
- task.start(props);
-
-// assertThat(task.getQueueSize(GeodeConnectorConfig.QUEUE_SIZE));
-
-
- }
-
-
-}