Added GeodeContext (for reals this time)
Added GeodeConnectorConfigTest
diff --git a/src/main/java/geode/kafka/GeodeContext.java b/src/main/java/geode/kafka/GeodeContext.java
new file mode 100644
index 0000000..4582b93
--- /dev/null
+++ b/src/main/java/geode/kafka/GeodeContext.java
@@ -0,0 +1,47 @@
+package geode.kafka;
+
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientCacheFactory;
+import org.apache.geode.cache.query.CqAttributes;
+import org.apache.geode.cache.query.CqException;
+import org.apache.geode.cache.query.CqExistsException;
+import org.apache.geode.cache.query.CqQuery;
+import org.apache.geode.cache.query.RegionNotFoundException;
+import org.apache.kafka.connect.errors.ConnectException;
+
+import java.util.List;
+
+public class GeodeContext {
+
+    private ClientCache clientCache;
+
+
+    public GeodeContext(GeodeConnectorConfig connectorConfig) {
+        clientCache = createClientCache(connectorConfig.getLocatorHostPorts(), connectorConfig.getDurableClientId(), connectorConfig.getDurableClientTimeout());
+    }
+
+    public ClientCache getClientCache() {
+        return clientCache;
+    }
+
+    public ClientCache createClientCache(List<LocatorHostPort> locators, String durableClientName, String durableClientTimeOut) {
+        ClientCacheFactory ccf = new ClientCacheFactory().set("durable-client-id", durableClientName)
+                .set("durable-client-timeout", durableClientTimeOut)
+                .setPoolSubscriptionEnabled(true);
+        for (LocatorHostPort locator: locators) {
+            ccf.addPoolLocator(locator.getHostName(), locator.getPort()).create();
+        }
+        return ccf.create();
+    }
+
+    public CqQuery newCq(String name, String query, CqAttributes cqAttributes, boolean isDurable) throws ConnectException {
+        try {
+            CqQuery cq = clientCache.getQueryService().newCq(name, query, cqAttributes, isDurable);
+            cq.execute();
+            return cq;
+        } catch (RegionNotFoundException | CqException | CqExistsException e) {
+            e.printStackTrace();
+            throw new ConnectException(e);
+        }
+    }
+}
diff --git a/src/test/java/geode/kafka/GeodeConnectorConfigTest.java b/src/test/java/geode/kafka/GeodeConnectorConfigTest.java
new file mode 100644
index 0000000..54f9e52
--- /dev/null
+++ b/src/test/java/geode/kafka/GeodeConnectorConfigTest.java
@@ -0,0 +1,44 @@
+package geode.kafka;
+
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.hamcrest.CoreMatchers.allOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+public class GeodeConnectorConfigTest {
+
+    @Test
+    public void parseRegionNamesShouldSplitOnComma() {
+        GeodeConnectorConfig config = new GeodeConnectorConfig();
+        List<String> regionNames = config.parseNames("region1,region2,region3,region4");
+        assertEquals(4, regionNames.size());
+        assertThat(true, allOf(is(regionNames.contains("region1"))
+                , is(regionNames.contains("region2"))
+                , is(regionNames.contains("region3"))
+                , is(regionNames.contains("region4"))));
+    }
+
+    @Test
+    public void parseRegionNamesShouldChomp() {
+        GeodeConnectorConfig config = new GeodeConnectorConfig();
+        List<String> regionNames = config.parseNames("region1, region2, region3,region4");
+        assertEquals(4, regionNames.size());
+        assertThat(true, allOf(is(regionNames instanceof List)
+                , is(regionNames.contains("region1"))
+                , is(regionNames.contains("region2"))
+                , is(regionNames.contains("region3"))
+                , is(regionNames.contains("region4"))));
+    }
+
+    @Test
+    public void shouldBeAbleToParseGeodeLocatorStrings() {
+        GeodeConnectorConfig config = new GeodeConnectorConfig();
+        String locatorString="localhost[8888], localhost[8881]";
+        List<LocatorHostPort> locators = config.parseLocators(locatorString);
+        assertThat(2, is(locators.size()));
+    }
+}