Split regions and topics per task.
diff --git a/build.gradle b/build.gradle
index 87a2c36..515967d 100644
--- a/build.gradle
+++ b/build.gradle
@@ -26,8 +26,7 @@
     testCompile(group: 'org.apache.kafka', name: 'connect-runtime', version: '2.3.1')
 
     testCompile group: 'junit', name: 'junit', version: '4.12'
+    testCompile 'org.mockito:mockito-core:3.2.4'
 
     testImplementation 'org.awaitility:awaitility:4.0.2'
-
-
 }
diff --git a/src/main/java/geode/kafka/GeodeConnectorConfig.java b/src/main/java/geode/kafka/GeodeConnectorConfig.java
index 8e79e1f..11eff62 100644
--- a/src/main/java/geode/kafka/GeodeConnectorConfig.java
+++ b/src/main/java/geode/kafka/GeodeConnectorConfig.java
@@ -1,6 +1,7 @@
 package geode.kafka;
 
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -80,10 +81,14 @@
         locatorHostPorts = parseLocators(connectorProperties.get(GeodeConnectorConfig.LOCATORS));
     }
 
-    List<String> parseNames(String names) {
+    public static List<String> parseNames(String names) {
         return Arrays.stream(names.split(",")).map((s) -> s.trim()).collect(Collectors.toList());
     }
 
+    public static String reconstructString(Collection<String> strings) {
+        return strings.stream().collect(Collectors.joining(","));
+    }
+
     List<LocatorHostPort> parseLocators(String locators) {
         return Arrays.stream(locators.split(",")).map((s) -> {
             String locatorString = s.trim();
diff --git a/src/main/java/geode/kafka/sink/GeodeKafkaSink.java b/src/main/java/geode/kafka/sink/GeodeKafkaSink.java
index 08af2ad..c7afb96 100644
--- a/src/main/java/geode/kafka/sink/GeodeKafkaSink.java
+++ b/src/main/java/geode/kafka/sink/GeodeKafkaSink.java
@@ -4,6 +4,7 @@
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.sink.SinkConnector;
+import org.apache.kafka.connect.util.ConnectorUtils;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -22,6 +23,8 @@
 import static geode.kafka.GeodeConnectorConfig.DURABLE_CLIENT_TIME_OUT;
 import static geode.kafka.GeodeConnectorConfig.LOCATORS;
 import static geode.kafka.GeodeConnectorConfig.QUEUE_SIZE;
+import static geode.kafka.GeodeConnectorConfig.REGIONS;
+import static geode.kafka.GeodeConnectorConfig.TOPICS;
 
 public class GeodeKafkaSink extends SinkConnector  {
     private static final ConfigDef CONFIG_DEF = new ConfigDef();
@@ -41,11 +44,18 @@
     public List<Map<String, String>> taskConfigs(int maxTasks) {
         List<Map<String, String>> taskConfigs = new ArrayList<>();
         Map<String, String> taskProps = new HashMap<>();
-
         taskProps.putAll(sharedProps);
 
+        List<String> topics = GeodeConnectorConfig.parseNames(taskProps.get(TOPICS));
+        List<List<String>> topicsPerTask = ConnectorUtils.groupPartitions(topics, maxTasks);
+
+        List<String> regions = GeodeConnectorConfig.parseNames(taskProps.get(REGIONS));
+        List<List<String>> regionsPerTask = ConnectorUtils.groupPartitions(regions, maxTasks);
+
         for (int i = 0; i < maxTasks; i++) {
             taskProps.put(GeodeConnectorConfig.TASK_ID, "" + i);
+            taskProps.put(TOPICS, GeodeConnectorConfig.reconstructString(topicsPerTask.get(i)));
+            taskProps.put(REGIONS, GeodeConnectorConfig.reconstructString(regionsPerTask.get(i)));
             taskConfigs.add(taskProps);
         }
 
diff --git a/src/main/java/geode/kafka/source/GeodeKafkaSource.java b/src/main/java/geode/kafka/source/GeodeKafkaSource.java
index 8f60471..ca3f2c8 100644
--- a/src/main/java/geode/kafka/source/GeodeKafkaSource.java
+++ b/src/main/java/geode/kafka/source/GeodeKafkaSource.java
@@ -5,6 +5,7 @@
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.source.SourceConnector;
+import org.apache.kafka.connect.util.ConnectorUtils;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -25,6 +26,8 @@
 import static geode.kafka.GeodeConnectorConfig.LOAD_ENTIRE_REGION;
 import static geode.kafka.GeodeConnectorConfig.LOCATORS;
 import static geode.kafka.GeodeConnectorConfig.QUEUE_SIZE;
+import static geode.kafka.GeodeConnectorConfig.REGIONS;
+import static geode.kafka.GeodeConnectorConfig.TOPICS;
 
 
 public class GeodeKafkaSource extends SourceConnector {
@@ -45,9 +48,16 @@
     Map<String, String> taskProps = new HashMap<>();
     taskProps.putAll(sharedProps);
 
+    List<String> topics = GeodeConnectorConfig.parseNames(taskProps.get(TOPICS));
+    List<List<String>> topicsPerTask = ConnectorUtils.groupPartitions(topics, maxTasks);
+
+    List<String> regions = GeodeConnectorConfig.parseNames(taskProps.get(REGIONS));
+    List<List<String>> regionsPerTask = ConnectorUtils.groupPartitions(regions, maxTasks);
+
     for (int i = 0; i < maxTasks; i++) {
-    //TODO partition regions and topics
       taskProps.put(GeodeConnectorConfig.TASK_ID, "" + i);
+      taskProps.put(TOPICS, GeodeConnectorConfig.reconstructString(topicsPerTask.get(i)));
+      taskProps.put(REGIONS, GeodeConnectorConfig.reconstructString(regionsPerTask.get(i)));
       taskConfigs.add(taskProps);
     }
     return taskConfigs;
diff --git a/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java b/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java
index 657b274..8080179 100644
--- a/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java
+++ b/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java
@@ -21,6 +21,7 @@
 
 import static geode.kafka.GeodeConnectorConfig.BATCH_SIZE;
 import static geode.kafka.GeodeConnectorConfig.CQ_PREFIX;
+import static geode.kafka.GeodeConnectorConfig.DEFAULT_BATCH_SIZE;
 import static geode.kafka.GeodeConnectorConfig.LOAD_ENTIRE_REGION;
 import static geode.kafka.GeodeConnectorConfig.QUEUE_SIZE;
 import static geode.kafka.GeodeConnectorConfig.REGION_NAME;
@@ -39,7 +40,7 @@
     private GeodeConnectorConfig geodeConnectorConfig;
     private List<String> topics;
     private Map<String, Map<String, String>> sourcePartitions;
-    private static BlockingQueue<GeodeEvent> eventBuffer;
+    private BlockingQueue<GeodeEvent> eventBuffer;
     private int batchSize;
 
 
@@ -54,6 +55,12 @@
         return null;
     }
 
+    void startForTesting(BlockingQueue eventBuffer, List<String> topics, int batchSize) {
+        this.eventBuffer = eventBuffer;
+        this.topics = topics;
+        this.batchSize = batchSize;
+    }
+
     @Override
     public void start(Map<String, String> props) {
         try {
@@ -111,7 +118,7 @@
         }
     }
 
-    void installListenersToRegion(GeodeContext geodeContext, int taskId, BlockingQueue<GeodeEvent> eventBuffer, String regionName, String cqPrefix, boolean loadEntireRegion, boolean isDurable) {
+    GeodeKafkaSourceListener installListenersToRegion(GeodeContext geodeContext, int taskId, BlockingQueue<GeodeEvent> eventBuffer, String regionName, String cqPrefix, boolean loadEntireRegion, boolean isDurable) {
         CqAttributesFactory cqAttributesFactory = new CqAttributesFactory();
         GeodeKafkaSourceListener listener = new GeodeKafkaSourceListener(eventBuffer, regionName);
         cqAttributesFactory.addCqListener(listener);
@@ -129,6 +136,7 @@
         finally {
             listener.signalInitialResultsLoaded();
         }
+        return listener;
     }
 
     /**
@@ -148,5 +156,4 @@
     String generateCqName(int taskId, String cqPrefix, String regionName) {
         return cqPrefix + DOT + TASK_PREFIX + taskId + DOT + regionName;
     }
-
 }
diff --git a/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java b/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java
index 8f86cbc..041adc8 100644
--- a/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java
+++ b/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java
@@ -3,8 +3,10 @@
 import geode.kafka.GeodeConnectorConfig;
 import geode.kafka.GeodeContext;
 import org.apache.geode.cache.query.CqEvent;
+import org.apache.kafka.connect.source.SourceRecord;
 import org.junit.Test;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -13,8 +15,15 @@
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
+import static geode.kafka.GeodeConnectorConfig.BATCH_SIZE;
 import static geode.kafka.GeodeConnectorConfig.DEFAULT_CQ_PREFIX;
+import static geode.kafka.GeodeConnectorConfig.DEFAULT_LOCATOR;
+import static geode.kafka.GeodeConnectorConfig.DURABLE_CLIENT_ID_PREFIX;
+import static geode.kafka.GeodeConnectorConfig.LOCATORS;
+import static geode.kafka.GeodeConnectorConfig.REGIONS;
 import static geode.kafka.GeodeConnectorConfig.REGION_NAME;
+import static geode.kafka.GeodeConnectorConfig.TASK_ID;
+import static geode.kafka.GeodeConnectorConfig.TOPICS;
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
@@ -24,6 +33,7 @@
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+
 public class GeodeKafkaSourceTaskTest {
 
 
@@ -62,16 +72,36 @@
     }
 
     @Test
-    public void cqListenerOnEventPopulatesEventsBuffer() {}
+    public void cqListenerOnEventPopulatesEventsBuffer() {
+        GeodeContext geodeContext = mock(GeodeContext.class);
+        BlockingQueue<GeodeEvent> eventBuffer = new LinkedBlockingQueue(100);
+        boolean loadEntireRegion = false;
+        boolean isDurable = false;
 
-    @Test
-    public void pollReturnsEventsWhenEventBufferHasValues() {
+        when(geodeContext.newCqWithInitialResults(anyString(), anyString(), any(), anyBoolean())).thenReturn(new ArrayList());
+        GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
+        GeodeKafkaSourceListener listener = task.installListenersToRegion(geodeContext, 1, eventBuffer, "testRegion", DEFAULT_CQ_PREFIX, loadEntireRegion, isDurable);
 
+        listener.onEvent(mock(CqEvent.class));
+        assertEquals(1, eventBuffer.size());
     }
 
     @Test
-    public void regionsArePassedCorrectlyToTask() {
-
+    public void pollReturnsEventsWhenEventBufferHasValues() throws Exception {
+//        BlockingQueue<GeodeEvent> eventBuffer = new LinkedBlockingQueue(100);
+//        CqEvent cqEvent = mock(CqEvent.class);
+//        when(cqEvent.getNewValue()).thenReturn("New Value");
+//        GeodeEvent event = mock(GeodeEvent.class);
+//        when(event.getEvent()).thenReturn(cqEvent);
+//        eventBuffer.add(event);
+//
+//        List<String> topics = new ArrayList<>();
+//        topics.add("myTopic");
+//
+//        GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
+//        task.startForTesting(eventBuffer, topics, 1);
+//        List<SourceRecord> records = task.poll();
+//        assertEquals(1, records.size());
     }
 
     @Test
@@ -94,8 +124,6 @@
         assertThat(true, is(sourcePartitions.get("region3").get(REGION_NAME).equals("region3")));
     }
 
-
-
     @Test
     public void listOfLocatorsShouldBeConfiguredIntoClientCache() {