Configurable shared event buffer
diff --git a/src/main/java/geode/kafka/source/EventBufferSupplier.java b/src/main/java/geode/kafka/source/EventBufferSupplier.java
new file mode 100644
index 0000000..d844744
--- /dev/null
+++ b/src/main/java/geode/kafka/source/EventBufferSupplier.java
@@ -0,0 +1,7 @@
+package geode.kafka.source;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.function.Supplier;
+
+public interface EventBufferSupplier extends Supplier<BlockingQueue<GeodeEvent>> {
+}
diff --git a/src/main/java/geode/kafka/source/GeodeKafkaSourceListener.java b/src/main/java/geode/kafka/source/GeodeKafkaSourceListener.java
index f317965..5c8a152 100644
--- a/src/main/java/geode/kafka/source/GeodeKafkaSourceListener.java
+++ b/src/main/java/geode/kafka/source/GeodeKafkaSourceListener.java
@@ -19,7 +19,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 class GeodeKafkaSourceListener implements CqStatusListener {
@@ -27,12 +26,12 @@
     private static final Logger logger = LoggerFactory.getLogger(GeodeKafkaSourceListener.class);
 
     public String regionName;
-    private BlockingQueue<GeodeEvent> eventBuffer;
+    private EventBufferSupplier eventBufferSupplier;
     private boolean initialResultsLoaded;
 
-    public GeodeKafkaSourceListener(BlockingQueue<GeodeEvent> eventBuffer, String regionName) {
-        this.eventBuffer = eventBuffer;
+    public GeodeKafkaSourceListener(EventBufferSupplier eventBufferSupplier, String regionName) {
         this.regionName = regionName;
+        this.eventBufferSupplier = eventBufferSupplier;
         initialResultsLoaded = false;
     }
 
@@ -42,12 +41,12 @@
             Thread.yield();
         }
         try {
-            eventBuffer.offer(new GeodeEvent(regionName, aCqEvent), 2, TimeUnit.SECONDS);
+            eventBufferSupplier.get().offer(new GeodeEvent(regionName, aCqEvent), 2, TimeUnit.SECONDS);
         } catch (InterruptedException e) {
 
             while (true) {
                 try {
-                    if (!eventBuffer.offer(new GeodeEvent(regionName, aCqEvent), 2, TimeUnit.SECONDS))
+                    if (!eventBufferSupplier.get().offer(new GeodeEvent(regionName, aCqEvent), 2, TimeUnit.SECONDS))
                         break;
                 } catch (InterruptedException ex) {
                     ex.printStackTrace();
diff --git a/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java b/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java
index c6cf6cb..6efeb85 100644
--- a/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java
+++ b/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java
@@ -29,7 +29,6 @@
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.stream.Collectors;
 
 import static geode.kafka.source.GeodeSourceConnectorConfig.BATCH_SIZE;
@@ -48,11 +47,9 @@
 
     private GeodeContext geodeContext;
     private GeodeSourceConnectorConfig geodeConnectorConfig;
-    private int taskId;
+    private EventBufferSupplier eventBufferSupplier;
     private Map<String, List<String>> regionToTopics;
-    private Collection<String> cqsToRegister;
     private Map<String, Map<String, String>> sourcePartitions;
-    private static BlockingQueue<GeodeEvent> eventBuffer = new LinkedBlockingQueue<>(100000);
     private int batchSize;
 
 
@@ -71,21 +68,21 @@
     public void start(Map<String, String> props) {
         try {
             geodeConnectorConfig = new GeodeSourceConnectorConfig(props);
-            taskId = geodeConnectorConfig.getTaskId();
+            int taskId = geodeConnectorConfig.getTaskId();
             logger.debug("GeodeKafkaSourceTask id:" + geodeConnectorConfig.getTaskId() + " starting");
             geodeContext = new GeodeContext();
             geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(), geodeConnectorConfig.getDurableClientId(), geodeConnectorConfig.getDurableClientTimeout(), geodeConnectorConfig.getSecurityClientAuthInit());
 
             batchSize = Integer.parseInt(props.get(BATCH_SIZE));
+            eventBufferSupplier = new SharedEventBufferSupplier(Integer.parseInt(props.get(QUEUE_SIZE)));
 
             regionToTopics = geodeConnectorConfig.getRegionToTopics();
-            cqsToRegister = geodeConnectorConfig.getCqsToRegister();
+            geodeConnectorConfig.getCqsToRegister();
             sourcePartitions = createSourcePartitionsMap(regionToTopics.keySet());
 
             String cqPrefix = geodeConnectorConfig.getCqPrefix();
-
             boolean loadEntireRegion = geodeConnectorConfig.getLoadEntireRegion();
-            installOnGeode(geodeConnectorConfig, geodeContext, eventBuffer, cqPrefix, loadEntireRegion);
+            installOnGeode(geodeConnectorConfig, geodeContext, eventBufferSupplier, cqPrefix, loadEntireRegion);
         } catch (Exception e) {
             e.printStackTrace();
             logger.error("Unable to start source task", e);
@@ -97,7 +94,7 @@
     public List<SourceRecord> poll() throws InterruptedException {
         ArrayList<SourceRecord> records = new ArrayList<>(batchSize);
         ArrayList<GeodeEvent> events = new ArrayList<>(batchSize);
-        if (eventBuffer.drainTo(events, batchSize) > 0) {
+        if (eventBufferSupplier.get().drainTo(events, batchSize) > 0) {
             for (GeodeEvent event : events) {
                 String regionName = event.getRegionName();
                 List<String> topics = regionToTopics.get(regionName);
@@ -116,7 +113,7 @@
         geodeContext.getClientCache().close(true);
     }
 
-    void installOnGeode(GeodeSourceConnectorConfig geodeConnectorConfig, GeodeContext geodeContext, BlockingQueue eventBuffer, String cqPrefix, boolean loadEntireRegion) {
+    void installOnGeode(GeodeSourceConnectorConfig geodeConnectorConfig, GeodeContext geodeContext, EventBufferSupplier eventBuffer, String cqPrefix, boolean loadEntireRegion) {
         boolean isDurable = geodeConnectorConfig.isDurable();
         int taskId = geodeConnectorConfig.getTaskId();
         for (String region : geodeConnectorConfig.getCqsToRegister()) {
@@ -127,7 +124,7 @@
         }
     }
 
-    GeodeKafkaSourceListener installListenersToRegion(GeodeContext geodeContext, int taskId, BlockingQueue<GeodeEvent> eventBuffer, String regionName, String cqPrefix, boolean loadEntireRegion, boolean isDurable) {
+    GeodeKafkaSourceListener installListenersToRegion(GeodeContext geodeContext, int taskId, EventBufferSupplier eventBuffer, String regionName, String cqPrefix, boolean loadEntireRegion, boolean isDurable) {
         CqAttributesFactory cqAttributesFactory = new CqAttributesFactory();
         GeodeKafkaSourceListener listener = new GeodeKafkaSourceListener(eventBuffer, regionName);
         cqAttributesFactory.addCqListener(listener);
@@ -136,7 +133,7 @@
             if (loadEntireRegion) {
                 Collection<CqEvent> events = geodeContext.newCqWithInitialResults(generateCqName(taskId, cqPrefix, regionName), "select * from /" + regionName, cqAttributes,
                         isDurable);
-                eventBuffer.addAll(events.stream().map(e -> new GeodeEvent(regionName, e)).collect(Collectors.toList()));
+                eventBuffer.get().addAll(events.stream().map(e -> new GeodeEvent(regionName, e)).collect(Collectors.toList()));
             } else {
                 geodeContext.newCq(generateCqName(taskId, cqPrefix, regionName), "select * from /" + regionName, cqAttributes,
                         isDurable);
diff --git a/src/main/java/geode/kafka/source/SharedEventBufferSupplier.java b/src/main/java/geode/kafka/source/SharedEventBufferSupplier.java
new file mode 100644
index 0000000..2a4c883
--- /dev/null
+++ b/src/main/java/geode/kafka/source/SharedEventBufferSupplier.java
@@ -0,0 +1,37 @@
+package geode.kafka.source;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.Supplier;
+
+public class SharedEventBufferSupplier implements EventBufferSupplier {
+
+    private static BlockingQueue<GeodeEvent> eventBuffer;
+
+    public SharedEventBufferSupplier(int size) {
+        recreateEventBufferIfNeeded(size);
+    }
+
+    BlockingQueue recreateEventBufferIfNeeded(int size) {
+        if (eventBuffer == null || (eventBuffer.size() + eventBuffer.remainingCapacity()) != size) {
+            synchronized (GeodeKafkaSource.class) {
+                if (eventBuffer == null || (eventBuffer.size() + eventBuffer.remainingCapacity()) != size) {
+                    BlockingQueue<GeodeEvent> oldEventBuffer = eventBuffer;
+                    eventBuffer = new LinkedBlockingQueue<>(size);
+                    if (oldEventBuffer != null) {
+                        eventBuffer.addAll(oldEventBuffer);
+                    }
+                }
+            }
+        }
+        return eventBuffer;
+    }
+
+    /**
+     * Callers should not store a reference to this and instead always call get to make sure we always use the latest buffer
+     * Buffers themselves shouldn't change often but in cases where we want to modify the size
+     */
+    public BlockingQueue<GeodeEvent> get() {
+        return eventBuffer;
+    }
+}
diff --git a/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java b/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java
index ffcc3d8..33f1ab5 100644
--- a/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java
+++ b/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java
@@ -58,7 +58,7 @@
 
         when(geodeContext.newCqWithInitialResults(anyString(), anyString(), any(), anyBoolean())).thenReturn(fakeInitialResults);
         GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
-        task.installListenersToRegion(geodeContext, 1, eventBuffer, "testRegion", DEFAULT_CQ_PREFIX, loadEntireRegion, isDurable);
+        task.installListenersToRegion(geodeContext, 1, createEventBufferSupplier(eventBuffer), "testRegion", DEFAULT_CQ_PREFIX, loadEntireRegion, isDurable);
         assertEquals(10, eventBuffer.size());
     }
 
@@ -75,7 +75,7 @@
 
         when(geodeContext.newCqWithInitialResults(anyString(), anyString(), any(), anyBoolean())).thenReturn(fakeInitialResults);
         GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
-        task.installListenersToRegion(geodeContext, 1, eventBuffer, "testRegion", DEFAULT_CQ_PREFIX, loadEntireRegion, isDurable);
+        task.installListenersToRegion(geodeContext, 1, createEventBufferSupplier(eventBuffer), "testRegion", DEFAULT_CQ_PREFIX, loadEntireRegion, isDurable);
         assertEquals(0, eventBuffer.size());
     }
 
@@ -88,7 +88,7 @@
 
         when(geodeContext.newCqWithInitialResults(anyString(), anyString(), any(), anyBoolean())).thenReturn(new ArrayList());
         GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
-        GeodeKafkaSourceListener listener = task.installListenersToRegion(geodeContext, 1, eventBuffer, "testRegion", DEFAULT_CQ_PREFIX, loadEntireRegion, isDurable);
+        GeodeKafkaSourceListener listener = task.installListenersToRegion(geodeContext, 1, createEventBufferSupplier(eventBuffer), "testRegion", DEFAULT_CQ_PREFIX, loadEntireRegion, isDurable);
 
         listener.onEvent(mock(CqEvent.class));
         assertEquals(1, eventBuffer.size());
@@ -140,7 +140,7 @@
         when (config.getCqsToRegister()).thenReturn(regionToTopicsMap.keySet());
 
         GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
-        task.installOnGeode(config, geodeContext, new LinkedBlockingQueue(), "someCqPrefix", true);
+        task.installOnGeode(config, geodeContext, createEventBufferSupplier(new LinkedBlockingQueue<>()), "someCqPrefix", true);
         verify(geodeContext, times(1)).newCqWithInitialResults(anyString(), anyString(), any(), anyBoolean());
     }
 
@@ -218,4 +218,13 @@
 //        GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
     }
 
+
+    private EventBufferSupplier createEventBufferSupplier(BlockingQueue<GeodeEvent> eventBuffer) {
+        return new EventBufferSupplier() {
+            @Override
+            public BlockingQueue<GeodeEvent> get() {
+                return eventBuffer;
+            }
+        };
+    }
 }
diff --git a/src/test/java/geode/kafka/source/SharedEventBufferSupplierTest.java b/src/test/java/geode/kafka/source/SharedEventBufferSupplierTest.java
new file mode 100644
index 0000000..b4a429b
--- /dev/null
+++ b/src/test/java/geode/kafka/source/SharedEventBufferSupplierTest.java
@@ -0,0 +1,51 @@
+package geode.kafka.source;
+
+import org.junit.Test;
+
+import java.util.concurrent.BlockingQueue;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.mock;
+
+public class SharedEventBufferSupplierTest {
+
+    @Test
+    public void creatingNewSharedEventSupplierShouldCreateInstance() {
+        SharedEventBufferSupplier supplier = new SharedEventBufferSupplier(1);
+        assertNotNull(supplier.get());
+    }
+
+    @Test
+    public void alreadySharedEventSupplierShouldReturnSameInstanceOfEventBuffer() {
+        SharedEventBufferSupplier supplier = new SharedEventBufferSupplier(1);
+        BlockingQueue<GeodeEvent> queue = supplier.get();
+        supplier = new SharedEventBufferSupplier(1);
+        assertEquals(queue, supplier.get());
+    }
+
+    @Test
+    public void newEventBufferShouldBeReflectedInAllSharedSuppliers() {
+        SharedEventBufferSupplier supplier = new SharedEventBufferSupplier(1);
+        SharedEventBufferSupplier newSupplier = new SharedEventBufferSupplier(2);
+        assertEquals(supplier.get(), newSupplier.get());
+    }
+
+    @Test
+    public void newEventBufferSuppliedShouldNotBeTheOldQueue() {
+        SharedEventBufferSupplier supplier = new SharedEventBufferSupplier(1);
+        BlockingQueue<GeodeEvent> queue = supplier.get();
+        SharedEventBufferSupplier newSupplier = new SharedEventBufferSupplier(2);
+        assertNotEquals(queue, newSupplier.get());
+    }
+
+    @Test
+    public void newEventBufferShouldContainAllEventsFromTheOldSupplier() {
+        SharedEventBufferSupplier supplier = new SharedEventBufferSupplier(1);
+        GeodeEvent geodeEvent = mock(GeodeEvent.class);
+        supplier.get().add(geodeEvent);
+        SharedEventBufferSupplier newSupplier = new SharedEventBufferSupplier(2);
+        assertEquals(geodeEvent, newSupplier.get().poll());
+    }
+}