Added loading entire region through initial results
diff --git a/src/main/java/geode/kafka/GeodeConnectorConfig.java b/src/main/java/geode/kafka/GeodeConnectorConfig.java
index 4f75ec0..8e79e1f 100644
--- a/src/main/java/geode/kafka/GeodeConnectorConfig.java
+++ b/src/main/java/geode/kafka/GeodeConnectorConfig.java
@@ -13,7 +13,6 @@
     public static final String DURABLE_CLIENT_TIME_OUT = "durableClientTimeout";
     public static final String DEFAULT_DURABLE_CLIENT_TIMEOUT = "60000";
 
-
     //GeodeKafka Specific Configuration
     public static final String TASK_ID = "GEODE_TASK_ID"; //One config per task
 
@@ -46,6 +45,8 @@
     public static final String QUEUE_SIZE = "geodeConnectorQueueSize";
     public static final String DEFAULT_QUEUE_SIZE = "100000";
 
+    public static final String LOAD_ENTIRE_REGION = "loadEntireRegion";
+    public static final String DEFAULT_LOAD_ENTIRE_REGION = "false";
 
 
     private final int taskId;
diff --git a/src/main/java/geode/kafka/GeodeContext.java b/src/main/java/geode/kafka/GeodeContext.java
index 4582b93..d1fd3ae 100644
--- a/src/main/java/geode/kafka/GeodeContext.java
+++ b/src/main/java/geode/kafka/GeodeContext.java
@@ -9,6 +9,7 @@
 import org.apache.geode.cache.query.RegionNotFoundException;
 import org.apache.kafka.connect.errors.ConnectException;
 
+import java.util.Collection;
 import java.util.List;
 
 public class GeodeContext {
@@ -40,7 +41,15 @@
             cq.execute();
             return cq;
         } catch (RegionNotFoundException | CqException | CqExistsException e) {
-            e.printStackTrace();
+            throw new ConnectException(e);
+        }
+    }
+
+    public Collection newCqWithInitialResults(String name, String query, CqAttributes cqAttributes, boolean isDurable) throws ConnectException {
+        try {
+            CqQuery cq = clientCache.getQueryService().newCq(name, query, cqAttributes, isDurable);
+            return cq.executeWithInitialResults();
+        } catch (RegionNotFoundException | CqException | CqExistsException e) {
             throw new ConnectException(e);
         }
     }
diff --git a/src/main/java/geode/kafka/source/GeodeKafkaSource.java b/src/main/java/geode/kafka/source/GeodeKafkaSource.java
index 36b9942..8f60471 100644
--- a/src/main/java/geode/kafka/source/GeodeKafkaSource.java
+++ b/src/main/java/geode/kafka/source/GeodeKafkaSource.java
@@ -17,10 +17,12 @@
 import static geode.kafka.GeodeConnectorConfig.DEFAULT_CQ_PREFIX;
 import static geode.kafka.GeodeConnectorConfig.DEFAULT_DURABLE_CLIENT_ID;
 import static geode.kafka.GeodeConnectorConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT;
+import static geode.kafka.GeodeConnectorConfig.DEFAULT_LOAD_ENTIRE_REGION;
 import static geode.kafka.GeodeConnectorConfig.DEFAULT_LOCATOR;
 import static geode.kafka.GeodeConnectorConfig.DEFAULT_QUEUE_SIZE;
 import static geode.kafka.GeodeConnectorConfig.DURABLE_CLIENT_ID_PREFIX;
 import static geode.kafka.GeodeConnectorConfig.DURABLE_CLIENT_TIME_OUT;
+import static geode.kafka.GeodeConnectorConfig.LOAD_ENTIRE_REGION;
 import static geode.kafka.GeodeConnectorConfig.LOCATORS;
 import static geode.kafka.GeodeConnectorConfig.QUEUE_SIZE;
 
@@ -69,6 +71,7 @@
     props.computeIfAbsent(BATCH_SIZE, (key) -> DEFAULT_BATCH_SIZE);
     props.computeIfAbsent(QUEUE_SIZE, (key) -> DEFAULT_QUEUE_SIZE);
     props.computeIfAbsent(CQ_PREFIX, (key) -> DEFAULT_CQ_PREFIX);
+    props.computeIfAbsent(LOAD_ENTIRE_REGION, (key) -> DEFAULT_LOAD_ENTIRE_REGION);
     return props;
   }
 
diff --git a/src/main/java/geode/kafka/source/GeodeKafkaSourceListener.java b/src/main/java/geode/kafka/source/GeodeKafkaSourceListener.java
index 019bb5a..8ae0045 100644
--- a/src/main/java/geode/kafka/source/GeodeKafkaSourceListener.java
+++ b/src/main/java/geode/kafka/source/GeodeKafkaSourceListener.java
@@ -14,14 +14,19 @@
 
     public String regionName;
     private BlockingQueue<GeodeEvent> eventBuffer;
+    private boolean initialResultsLoaded;
 
     public GeodeKafkaSourceListener(BlockingQueue<GeodeEvent> eventBuffer, String regionName) {
         this.eventBuffer = eventBuffer;
         this.regionName = regionName;
+        initialResultsLoaded = false;
     }
 
     @Override
     public void onEvent(CqEvent aCqEvent) {
+        while (!initialResultsLoaded) {
+            Thread.yield();
+        }
         try {
             eventBuffer.offer(new GeodeEvent(regionName, aCqEvent), 2, TimeUnit.SECONDS);
         } catch (InterruptedException e) {
@@ -52,4 +57,8 @@
     public void onCqConnected() {
 
     }
+
+    public void signalInitialResultsLoaded() {
+        initialResultsLoaded = true;
+    }
 }
diff --git a/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java b/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java
index 829eb29..657b274 100644
--- a/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java
+++ b/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java
@@ -1,15 +1,17 @@
 package geode.kafka.source;
 
-import geode.kafka.GeodeContext;
 import geode.kafka.GeodeConnectorConfig;
+import geode.kafka.GeodeContext;
 import org.apache.geode.cache.query.CqAttributes;
 import org.apache.geode.cache.query.CqAttributesFactory;
+import org.apache.geode.cache.query.CqEvent;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.source.SourceTask;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -19,6 +21,7 @@
 
 import static geode.kafka.GeodeConnectorConfig.BATCH_SIZE;
 import static geode.kafka.GeodeConnectorConfig.CQ_PREFIX;
+import static geode.kafka.GeodeConnectorConfig.LOAD_ENTIRE_REGION;
 import static geode.kafka.GeodeConnectorConfig.QUEUE_SIZE;
 import static geode.kafka.GeodeConnectorConfig.REGION_NAME;
 
@@ -33,6 +36,7 @@
     private static final Map<String, Long> OFFSET_DEFAULT = createOffset();
 
     private GeodeContext geodeContext;
+    private GeodeConnectorConfig geodeConnectorConfig;
     private List<String> topics;
     private Map<String, Map<String, String>> sourcePartitions;
     private static BlockingQueue<GeodeEvent> eventBuffer;
@@ -53,7 +57,7 @@
     @Override
     public void start(Map<String, String> props) {
         try {
-            GeodeConnectorConfig geodeConnectorConfig = new GeodeConnectorConfig(props);
+            geodeConnectorConfig = new GeodeConnectorConfig(props);
             logger.debug("GeodeKafkaSourceTask id:" + geodeConnectorConfig.getTaskId() + " starting");
             geodeContext = new GeodeContext(geodeConnectorConfig);
 
@@ -65,8 +69,9 @@
             topics = geodeConnectorConfig.getTopics();
 
             String cqPrefix = props.get(CQ_PREFIX);
+            boolean loadEntireRegion = Boolean.parseBoolean(props.get(LOAD_ENTIRE_REGION));
 
-            installOnGeode(geodeConnectorConfig, geodeContext, eventBuffer, cqPrefix);
+            installOnGeode(geodeConnectorConfig, geodeContext, eventBuffer, cqPrefix, loadEntireRegion);
         }
         catch (Exception e) {
             logger.error("Unable to start source task", e);
@@ -95,23 +100,35 @@
         geodeContext.getClientCache().close(true);
     }
 
-    void installOnGeode(GeodeConnectorConfig geodeConnectorConfig, GeodeContext geodeContext, BlockingQueue eventBuffer, String cqPrefix) {
+    void installOnGeode(GeodeConnectorConfig geodeConnectorConfig, GeodeContext geodeContext, BlockingQueue eventBuffer, String cqPrefix, boolean loadEntireRegion) {
       boolean isDurable = geodeConnectorConfig.isDurable();
       int taskId = geodeConnectorConfig.getTaskId();
         for (String region : geodeConnectorConfig.getRegionNames()) {
-            installListenersToRegion(geodeContext, taskId, eventBuffer, region, cqPrefix, isDurable);
+            installListenersToRegion(geodeContext, taskId, eventBuffer, region, cqPrefix, loadEntireRegion, isDurable);
         }
         if (isDurable) {
             geodeContext.getClientCache().readyForEvents();
         }
     }
 
-    void installListenersToRegion(GeodeContext geodeContext, int taskId, BlockingQueue<GeodeEvent> eventBuffer, String regionName, String cqPrefix, boolean isDurable) {
+    void installListenersToRegion(GeodeContext geodeContext, int taskId, BlockingQueue<GeodeEvent> eventBuffer, String regionName, String cqPrefix, boolean loadEntireRegion, boolean isDurable) {
         CqAttributesFactory cqAttributesFactory = new CqAttributesFactory();
-        cqAttributesFactory.addCqListener(new GeodeKafkaSourceListener(eventBuffer, regionName));
+        GeodeKafkaSourceListener listener = new GeodeKafkaSourceListener(eventBuffer, regionName);
+        cqAttributesFactory.addCqListener(listener);
         CqAttributes cqAttributes = cqAttributesFactory.create();
-        geodeContext.newCq(generateCqName(taskId, cqPrefix, regionName), "select * from /" + regionName, cqAttributes,
-                isDurable);
+        try {
+            if (loadEntireRegion) {
+                Collection<CqEvent> events = geodeContext.newCqWithInitialResults(generateCqName(taskId, cqPrefix, regionName), "select * from /" + regionName, cqAttributes,
+                        isDurable);
+                eventBuffer.addAll(events.stream().map(e -> new GeodeEvent(regionName, e)).collect(Collectors.toList()));
+            } else {
+                geodeContext.newCq(generateCqName(taskId, cqPrefix, regionName), "select * from /" + regionName, cqAttributes,
+                        isDurable);
+            }
+        }
+        finally {
+            listener.signalInitialResultsLoaded();
+        }
     }
 
     /**
diff --git a/src/test/java/geode/kafka/GeodeConnectorConfigTest.java b/src/test/java/geode/kafka/GeodeConnectorConfigTest.java
index 54f9e52..8f06f61 100644
--- a/src/test/java/geode/kafka/GeodeConnectorConfigTest.java
+++ b/src/test/java/geode/kafka/GeodeConnectorConfigTest.java
@@ -2,7 +2,9 @@
 
 import org.junit.Test;
 
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import static org.hamcrest.CoreMatchers.allOf;
 import static org.hamcrest.CoreMatchers.is;
@@ -41,4 +43,16 @@
         List<LocatorHostPort> locators = config.parseLocators(locatorString);
         assertThat(2, is(locators.size()));
     }
+
+    @Test
+    public void durableClientIdShouldNotBeSetIfPropertyIsNotSet() {
+        Map<String, String> props = new HashMap<>();
+        GeodeConnectorConfig config = new GeodeConnectorConfig(props);
+        assertEquals("", config.getDurableClientId());
+    }
+
+    @Test
+    public void cqPrefixShouldBeProperlyCalculatedFromProps() {
+
+    }
 }
diff --git a/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java b/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java
index de78345..8f86cbc 100644
--- a/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java
+++ b/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java
@@ -1,25 +1,70 @@
 package geode.kafka.source;
 
 import geode.kafka.GeodeConnectorConfig;
+import geode.kafka.GeodeContext;
+import org.apache.geode.cache.query.CqEvent;
 import org.junit.Test;
 
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 
+import static geode.kafka.GeodeConnectorConfig.DEFAULT_CQ_PREFIX;
 import static geode.kafka.GeodeConnectorConfig.REGION_NAME;
 import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class GeodeKafkaSourceTaskTest {
 
-    @Test
-    public void cqListenerOnEventPopulatesEventsBuffer() {
 
+    @Test
+    public void whenLoadingEntireRegionAbleToPutInitialResultsIntoEventBuffer() {
+        GeodeContext geodeContext = mock(GeodeContext.class);
+        BlockingQueue<GeodeEvent> eventBuffer = new LinkedBlockingQueue(100);
+        boolean loadEntireRegion = true;
+        boolean isDurable = false;
+        List<CqEvent> fakeInitialResults = new LinkedList<>();
+        for (int i = 0; i < 10; i++) {
+            fakeInitialResults.add(mock(CqEvent.class));
+        }
+
+        when(geodeContext.newCqWithInitialResults(anyString(), anyString(), any(), anyBoolean())).thenReturn(fakeInitialResults);
+        GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
+        task.installListenersToRegion(geodeContext, 1, eventBuffer, "testRegion", DEFAULT_CQ_PREFIX, loadEntireRegion, isDurable);
+        assertEquals(10, eventBuffer.size());
     }
 
     @Test
+    public void whenNotLoadingEntireRegionShouldNotPutInitialResultsIntoEventBuffer() {
+        GeodeContext geodeContext = mock(GeodeContext.class);
+        BlockingQueue<GeodeEvent> eventBuffer = new LinkedBlockingQueue(100);
+        boolean loadEntireRegion = false;
+        boolean isDurable = false;
+        List<CqEvent> fakeInitialResults = new LinkedList<>();
+        for (int i = 0; i < 10; i++) {
+            fakeInitialResults.add(mock(CqEvent.class));
+        }
+
+        when(geodeContext.newCqWithInitialResults(anyString(), anyString(), any(), anyBoolean())).thenReturn(fakeInitialResults);
+        GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
+        task.installListenersToRegion(geodeContext, 1, eventBuffer, "testRegion", DEFAULT_CQ_PREFIX, loadEntireRegion, isDurable);
+        assertEquals(0, eventBuffer.size());
+    }
+
+    @Test
+    public void cqListenerOnEventPopulatesEventsBuffer() {}
+
+    @Test
     public void pollReturnsEventsWhenEventBufferHasValues() {
 
     }
diff --git a/src/test/java/geode/kafka/source/GeodeKafkaSourceTest.java b/src/test/java/geode/kafka/source/GeodeKafkaSourceTest.java
index 717d495..5ffd363 100644
--- a/src/test/java/geode/kafka/source/GeodeKafkaSourceTest.java
+++ b/src/test/java/geode/kafka/source/GeodeKafkaSourceTest.java
@@ -1,5 +1,6 @@
 package geode.kafka.source;
 
+import geode.kafka.GeodeConnectorConfig;
 import org.junit.Test;
 
 import java.util.HashMap;
@@ -7,16 +8,6 @@
 
 public class GeodeKafkaSourceTest {
 
-    @Test
-    public void durableClientIdShouldNotBeSetIfPropertyIsNotSet() {
-        GeodeKafkaSource source = new GeodeKafkaSource();
-        Map<String, String> props = new HashMap<>();
-        source.start(props);
 
-    }
 
-    @Test
-    public void cqPrefixShouldBeProperlyCalculatedFromProps() {
-
-    }
 }