Added GeodeContext (for reals this time)
Added GeodeConnectorConfigTest
diff --git a/src/main/java/geode/kafka/GeodeContext.java b/src/main/java/geode/kafka/GeodeContext.java
new file mode 100644
index 0000000..4582b93
--- /dev/null
+++ b/src/main/java/geode/kafka/GeodeContext.java
@@ -0,0 +1,47 @@
+package geode.kafka;
+
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientCacheFactory;
+import org.apache.geode.cache.query.CqAttributes;
+import org.apache.geode.cache.query.CqException;
+import org.apache.geode.cache.query.CqExistsException;
+import org.apache.geode.cache.query.CqQuery;
+import org.apache.geode.cache.query.RegionNotFoundException;
+import org.apache.kafka.connect.errors.ConnectException;
+
+import java.util.List;
+
+public class GeodeContext {
+
+ private ClientCache clientCache;
+
+
+ public GeodeContext(GeodeConnectorConfig connectorConfig) {
+ clientCache = createClientCache(connectorConfig.getLocatorHostPorts(), connectorConfig.getDurableClientId(), connectorConfig.getDurableClientTimeout());
+ }
+
+ public ClientCache getClientCache() {
+ return clientCache;
+ }
+
+ public ClientCache createClientCache(List<LocatorHostPort> locators, String durableClientName, String durableClientTimeOut) {
+ ClientCacheFactory ccf = new ClientCacheFactory().set("durable-client-id", durableClientName)
+ .set("durable-client-timeout", durableClientTimeOut)
+ .setPoolSubscriptionEnabled(true);
+ for (LocatorHostPort locator: locators) {
+ ccf.addPoolLocator(locator.getHostName(), locator.getPort()).create();
+ }
+ return ccf.create();
+ }
+
+ public CqQuery newCq(String name, String query, CqAttributes cqAttributes, boolean isDurable) throws ConnectException {
+ try {
+ CqQuery cq = clientCache.getQueryService().newCq(name, query, cqAttributes, isDurable);
+ cq.execute();
+ return cq;
+ } catch (RegionNotFoundException | CqException | CqExistsException e) {
+ e.printStackTrace();
+ throw new ConnectException(e);
+ }
+ }
+}
diff --git a/src/test/java/geode/kafka/GeodeConnectorConfigTest.java b/src/test/java/geode/kafka/GeodeConnectorConfigTest.java
new file mode 100644
index 0000000..54f9e52
--- /dev/null
+++ b/src/test/java/geode/kafka/GeodeConnectorConfigTest.java
@@ -0,0 +1,44 @@
+package geode.kafka;
+
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.hamcrest.CoreMatchers.allOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+public class GeodeConnectorConfigTest {
+
+ @Test
+ public void parseRegionNamesShouldSplitOnComma() {
+ GeodeConnectorConfig config = new GeodeConnectorConfig();
+ List<String> regionNames = config.parseNames("region1,region2,region3,region4");
+ assertEquals(4, regionNames.size());
+ assertThat(true, allOf(is(regionNames.contains("region1"))
+ , is(regionNames.contains("region2"))
+ , is(regionNames.contains("region3"))
+ , is(regionNames.contains("region4"))));
+ }
+
+ @Test
+ public void parseRegionNamesShouldChomp() {
+ GeodeConnectorConfig config = new GeodeConnectorConfig();
+ List<String> regionNames = config.parseNames("region1, region2, region3,region4");
+ assertEquals(4, regionNames.size());
+ assertThat(true, allOf(is(regionNames instanceof List)
+ , is(regionNames.contains("region1"))
+ , is(regionNames.contains("region2"))
+ , is(regionNames.contains("region3"))
+ , is(regionNames.contains("region4"))));
+ }
+
+ @Test
+ public void shouldBeAbleToParseGeodeLocatorStrings() {
+ GeodeConnectorConfig config = new GeodeConnectorConfig();
+ String locatorString="localhost[8888], localhost[8881]";
+ List<LocatorHostPort> locators = config.parseLocators(locatorString);
+ assertThat(2, is(locators.size()));
+ }
+}