Configurable shared event buffer
diff --git a/src/main/java/geode/kafka/source/EventBufferSupplier.java b/src/main/java/geode/kafka/source/EventBufferSupplier.java
new file mode 100644
index 0000000..d844744
--- /dev/null
+++ b/src/main/java/geode/kafka/source/EventBufferSupplier.java
@@ -0,0 +1,7 @@
+package geode.kafka.source;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.function.Supplier;
+
+public interface EventBufferSupplier extends Supplier<BlockingQueue<GeodeEvent>> {
+}
diff --git a/src/main/java/geode/kafka/source/GeodeKafkaSourceListener.java b/src/main/java/geode/kafka/source/GeodeKafkaSourceListener.java
index f317965..5c8a152 100644
--- a/src/main/java/geode/kafka/source/GeodeKafkaSourceListener.java
+++ b/src/main/java/geode/kafka/source/GeodeKafkaSourceListener.java
@@ -19,7 +19,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
class GeodeKafkaSourceListener implements CqStatusListener {
@@ -27,12 +26,12 @@
private static final Logger logger = LoggerFactory.getLogger(GeodeKafkaSourceListener.class);
public String regionName;
- private BlockingQueue<GeodeEvent> eventBuffer;
+ private EventBufferSupplier eventBufferSupplier;
private boolean initialResultsLoaded;
- public GeodeKafkaSourceListener(BlockingQueue<GeodeEvent> eventBuffer, String regionName) {
- this.eventBuffer = eventBuffer;
+ public GeodeKafkaSourceListener(EventBufferSupplier eventBufferSupplier, String regionName) {
this.regionName = regionName;
+ this.eventBufferSupplier = eventBufferSupplier;
initialResultsLoaded = false;
}
@@ -42,12 +41,12 @@
Thread.yield();
}
try {
- eventBuffer.offer(new GeodeEvent(regionName, aCqEvent), 2, TimeUnit.SECONDS);
+ eventBufferSupplier.get().offer(new GeodeEvent(regionName, aCqEvent), 2, TimeUnit.SECONDS);
} catch (InterruptedException e) {
while (true) {
try {
- if (!eventBuffer.offer(new GeodeEvent(regionName, aCqEvent), 2, TimeUnit.SECONDS))
+ if (!eventBufferSupplier.get().offer(new GeodeEvent(regionName, aCqEvent), 2, TimeUnit.SECONDS))
break;
} catch (InterruptedException ex) {
ex.printStackTrace();
diff --git a/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java b/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java
index c6cf6cb..6efeb85 100644
--- a/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java
+++ b/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java
@@ -29,7 +29,6 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;
import static geode.kafka.source.GeodeSourceConnectorConfig.BATCH_SIZE;
@@ -48,11 +47,9 @@
private GeodeContext geodeContext;
private GeodeSourceConnectorConfig geodeConnectorConfig;
- private int taskId;
+ private EventBufferSupplier eventBufferSupplier;
private Map<String, List<String>> regionToTopics;
- private Collection<String> cqsToRegister;
private Map<String, Map<String, String>> sourcePartitions;
- private static BlockingQueue<GeodeEvent> eventBuffer = new LinkedBlockingQueue<>(100000);
private int batchSize;
@@ -71,21 +68,21 @@
public void start(Map<String, String> props) {
try {
geodeConnectorConfig = new GeodeSourceConnectorConfig(props);
- taskId = geodeConnectorConfig.getTaskId();
+ int taskId = geodeConnectorConfig.getTaskId();
logger.debug("GeodeKafkaSourceTask id:" + geodeConnectorConfig.getTaskId() + " starting");
geodeContext = new GeodeContext();
geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(), geodeConnectorConfig.getDurableClientId(), geodeConnectorConfig.getDurableClientTimeout(), geodeConnectorConfig.getSecurityClientAuthInit());
batchSize = Integer.parseInt(props.get(BATCH_SIZE));
+ eventBufferSupplier = new SharedEventBufferSupplier(Integer.parseInt(props.get(QUEUE_SIZE)));
regionToTopics = geodeConnectorConfig.getRegionToTopics();
- cqsToRegister = geodeConnectorConfig.getCqsToRegister();
+ geodeConnectorConfig.getCqsToRegister();
sourcePartitions = createSourcePartitionsMap(regionToTopics.keySet());
String cqPrefix = geodeConnectorConfig.getCqPrefix();
-
boolean loadEntireRegion = geodeConnectorConfig.getLoadEntireRegion();
- installOnGeode(geodeConnectorConfig, geodeContext, eventBuffer, cqPrefix, loadEntireRegion);
+ installOnGeode(geodeConnectorConfig, geodeContext, eventBufferSupplier, cqPrefix, loadEntireRegion);
} catch (Exception e) {
e.printStackTrace();
logger.error("Unable to start source task", e);
@@ -97,7 +94,7 @@
public List<SourceRecord> poll() throws InterruptedException {
ArrayList<SourceRecord> records = new ArrayList<>(batchSize);
ArrayList<GeodeEvent> events = new ArrayList<>(batchSize);
- if (eventBuffer.drainTo(events, batchSize) > 0) {
+ if (eventBufferSupplier.get().drainTo(events, batchSize) > 0) {
for (GeodeEvent event : events) {
String regionName = event.getRegionName();
List<String> topics = regionToTopics.get(regionName);
@@ -116,7 +113,7 @@
geodeContext.getClientCache().close(true);
}
- void installOnGeode(GeodeSourceConnectorConfig geodeConnectorConfig, GeodeContext geodeContext, BlockingQueue eventBuffer, String cqPrefix, boolean loadEntireRegion) {
+ void installOnGeode(GeodeSourceConnectorConfig geodeConnectorConfig, GeodeContext geodeContext, EventBufferSupplier eventBuffer, String cqPrefix, boolean loadEntireRegion) {
boolean isDurable = geodeConnectorConfig.isDurable();
int taskId = geodeConnectorConfig.getTaskId();
for (String region : geodeConnectorConfig.getCqsToRegister()) {
@@ -127,7 +124,7 @@
}
}
- GeodeKafkaSourceListener installListenersToRegion(GeodeContext geodeContext, int taskId, BlockingQueue<GeodeEvent> eventBuffer, String regionName, String cqPrefix, boolean loadEntireRegion, boolean isDurable) {
+ GeodeKafkaSourceListener installListenersToRegion(GeodeContext geodeContext, int taskId, EventBufferSupplier eventBuffer, String regionName, String cqPrefix, boolean loadEntireRegion, boolean isDurable) {
CqAttributesFactory cqAttributesFactory = new CqAttributesFactory();
GeodeKafkaSourceListener listener = new GeodeKafkaSourceListener(eventBuffer, regionName);
cqAttributesFactory.addCqListener(listener);
@@ -136,7 +133,7 @@
if (loadEntireRegion) {
Collection<CqEvent> events = geodeContext.newCqWithInitialResults(generateCqName(taskId, cqPrefix, regionName), "select * from /" + regionName, cqAttributes,
isDurable);
- eventBuffer.addAll(events.stream().map(e -> new GeodeEvent(regionName, e)).collect(Collectors.toList()));
+ eventBuffer.get().addAll(events.stream().map(e -> new GeodeEvent(regionName, e)).collect(Collectors.toList()));
} else {
geodeContext.newCq(generateCqName(taskId, cqPrefix, regionName), "select * from /" + regionName, cqAttributes,
isDurable);
diff --git a/src/main/java/geode/kafka/source/SharedEventBufferSupplier.java b/src/main/java/geode/kafka/source/SharedEventBufferSupplier.java
new file mode 100644
index 0000000..2a4c883
--- /dev/null
+++ b/src/main/java/geode/kafka/source/SharedEventBufferSupplier.java
@@ -0,0 +1,37 @@
+package geode.kafka.source;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.Supplier;
+
+public class SharedEventBufferSupplier implements EventBufferSupplier {
+
+ private static BlockingQueue<GeodeEvent> eventBuffer;
+
+ public SharedEventBufferSupplier(int size) {
+ recreateEventBufferIfNeeded(size);
+ }
+
+ BlockingQueue recreateEventBufferIfNeeded(int size) {
+ if (eventBuffer == null || (eventBuffer.size() + eventBuffer.remainingCapacity()) != size) {
+ synchronized (GeodeKafkaSource.class) {
+ if (eventBuffer == null || (eventBuffer.size() + eventBuffer.remainingCapacity()) != size) {
+ BlockingQueue<GeodeEvent> oldEventBuffer = eventBuffer;
+ eventBuffer = new LinkedBlockingQueue<>(size);
+ if (oldEventBuffer != null) {
+ eventBuffer.addAll(oldEventBuffer);
+ }
+ }
+ }
+ }
+ return eventBuffer;
+ }
+
+ /**
+ * Callers should not store a reference to this and instead always call get to make sure we always use the latest buffer
+ * Buffers themselves shouldn't change often but in cases where we want to modify the size
+ */
+ public BlockingQueue<GeodeEvent> get() {
+ return eventBuffer;
+ }
+}
diff --git a/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java b/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java
index ffcc3d8..33f1ab5 100644
--- a/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java
+++ b/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java
@@ -58,7 +58,7 @@
when(geodeContext.newCqWithInitialResults(anyString(), anyString(), any(), anyBoolean())).thenReturn(fakeInitialResults);
GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
- task.installListenersToRegion(geodeContext, 1, eventBuffer, "testRegion", DEFAULT_CQ_PREFIX, loadEntireRegion, isDurable);
+ task.installListenersToRegion(geodeContext, 1, createEventBufferSupplier(eventBuffer), "testRegion", DEFAULT_CQ_PREFIX, loadEntireRegion, isDurable);
assertEquals(10, eventBuffer.size());
}
@@ -75,7 +75,7 @@
when(geodeContext.newCqWithInitialResults(anyString(), anyString(), any(), anyBoolean())).thenReturn(fakeInitialResults);
GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
- task.installListenersToRegion(geodeContext, 1, eventBuffer, "testRegion", DEFAULT_CQ_PREFIX, loadEntireRegion, isDurable);
+ task.installListenersToRegion(geodeContext, 1, createEventBufferSupplier(eventBuffer), "testRegion", DEFAULT_CQ_PREFIX, loadEntireRegion, isDurable);
assertEquals(0, eventBuffer.size());
}
@@ -88,7 +88,7 @@
when(geodeContext.newCqWithInitialResults(anyString(), anyString(), any(), anyBoolean())).thenReturn(new ArrayList());
GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
- GeodeKafkaSourceListener listener = task.installListenersToRegion(geodeContext, 1, eventBuffer, "testRegion", DEFAULT_CQ_PREFIX, loadEntireRegion, isDurable);
+ GeodeKafkaSourceListener listener = task.installListenersToRegion(geodeContext, 1, createEventBufferSupplier(eventBuffer), "testRegion", DEFAULT_CQ_PREFIX, loadEntireRegion, isDurable);
listener.onEvent(mock(CqEvent.class));
assertEquals(1, eventBuffer.size());
@@ -140,7 +140,7 @@
when (config.getCqsToRegister()).thenReturn(regionToTopicsMap.keySet());
GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
- task.installOnGeode(config, geodeContext, new LinkedBlockingQueue(), "someCqPrefix", true);
+ task.installOnGeode(config, geodeContext, createEventBufferSupplier(new LinkedBlockingQueue<>()), "someCqPrefix", true);
verify(geodeContext, times(1)).newCqWithInitialResults(anyString(), anyString(), any(), anyBoolean());
}
@@ -218,4 +218,13 @@
// GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
}
+
+ private EventBufferSupplier createEventBufferSupplier(BlockingQueue<GeodeEvent> eventBuffer) {
+ return new EventBufferSupplier() {
+ @Override
+ public BlockingQueue<GeodeEvent> get() {
+ return eventBuffer;
+ }
+ };
+ }
}
diff --git a/src/test/java/geode/kafka/source/SharedEventBufferSupplierTest.java b/src/test/java/geode/kafka/source/SharedEventBufferSupplierTest.java
new file mode 100644
index 0000000..b4a429b
--- /dev/null
+++ b/src/test/java/geode/kafka/source/SharedEventBufferSupplierTest.java
@@ -0,0 +1,51 @@
+package geode.kafka.source;
+
+import org.junit.Test;
+
+import java.util.concurrent.BlockingQueue;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.mock;
+
+public class SharedEventBufferSupplierTest {
+
+ @Test
+ public void creatingNewSharedEventSupplierShouldCreateInstance() {
+ SharedEventBufferSupplier supplier = new SharedEventBufferSupplier(1);
+ assertNotNull(supplier.get());
+ }
+
+ @Test
+ public void alreadySharedEventSupplierShouldReturnSameInstanceOfEventBuffer() {
+ SharedEventBufferSupplier supplier = new SharedEventBufferSupplier(1);
+ BlockingQueue<GeodeEvent> queue = supplier.get();
+ supplier = new SharedEventBufferSupplier(1);
+ assertEquals(queue, supplier.get());
+ }
+
+ @Test
+ public void newEventBufferShouldBeReflectedInAllSharedSuppliers() {
+ SharedEventBufferSupplier supplier = new SharedEventBufferSupplier(1);
+ SharedEventBufferSupplier newSupplier = new SharedEventBufferSupplier(2);
+ assertEquals(supplier.get(), newSupplier.get());
+ }
+
+ @Test
+ public void newEventBufferSuppliedShouldNotBeTheOldQueue() {
+ SharedEventBufferSupplier supplier = new SharedEventBufferSupplier(1);
+ BlockingQueue<GeodeEvent> queue = supplier.get();
+ SharedEventBufferSupplier newSupplier = new SharedEventBufferSupplier(2);
+ assertNotEquals(queue, newSupplier.get());
+ }
+
+ @Test
+ public void newEventBufferShouldContainAllEventsFromTheOldSupplier() {
+ SharedEventBufferSupplier supplier = new SharedEventBufferSupplier(1);
+ GeodeEvent geodeEvent = mock(GeodeEvent.class);
+ supplier.get().add(geodeEvent);
+ SharedEventBufferSupplier newSupplier = new SharedEventBufferSupplier(2);
+ assertEquals(geodeEvent, newSupplier.get().poll());
+ }
+}