Split regions and topics per task.
diff --git a/build.gradle b/build.gradle
index 87a2c36..515967d 100644
--- a/build.gradle
+++ b/build.gradle
@@ -26,8 +26,7 @@
testCompile(group: 'org.apache.kafka', name: 'connect-runtime', version: '2.3.1')
testCompile group: 'junit', name: 'junit', version: '4.12'
+ testCompile 'org.mockito:mockito-core:3.2.4'
testImplementation 'org.awaitility:awaitility:4.0.2'
-
-
}
diff --git a/src/main/java/geode/kafka/GeodeConnectorConfig.java b/src/main/java/geode/kafka/GeodeConnectorConfig.java
index 8e79e1f..11eff62 100644
--- a/src/main/java/geode/kafka/GeodeConnectorConfig.java
+++ b/src/main/java/geode/kafka/GeodeConnectorConfig.java
@@ -1,6 +1,7 @@
package geode.kafka;
import java.util.Arrays;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -80,10 +81,14 @@
locatorHostPorts = parseLocators(connectorProperties.get(GeodeConnectorConfig.LOCATORS));
}
- List<String> parseNames(String names) {
+ public static List<String> parseNames(String names) {
return Arrays.stream(names.split(",")).map((s) -> s.trim()).collect(Collectors.toList());
}
+ public static String reconstructString(Collection<String> strings) {
+ return strings.stream().collect(Collectors.joining(","));
+ }
+
List<LocatorHostPort> parseLocators(String locators) {
return Arrays.stream(locators.split(",")).map((s) -> {
String locatorString = s.trim();
diff --git a/src/main/java/geode/kafka/sink/GeodeKafkaSink.java b/src/main/java/geode/kafka/sink/GeodeKafkaSink.java
index 08af2ad..c7afb96 100644
--- a/src/main/java/geode/kafka/sink/GeodeKafkaSink.java
+++ b/src/main/java/geode/kafka/sink/GeodeKafkaSink.java
@@ -4,6 +4,7 @@
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.sink.SinkConnector;
+import org.apache.kafka.connect.util.ConnectorUtils;
import java.util.ArrayList;
import java.util.HashMap;
@@ -22,6 +23,8 @@
import static geode.kafka.GeodeConnectorConfig.DURABLE_CLIENT_TIME_OUT;
import static geode.kafka.GeodeConnectorConfig.LOCATORS;
import static geode.kafka.GeodeConnectorConfig.QUEUE_SIZE;
+import static geode.kafka.GeodeConnectorConfig.REGIONS;
+import static geode.kafka.GeodeConnectorConfig.TOPICS;
public class GeodeKafkaSink extends SinkConnector {
private static final ConfigDef CONFIG_DEF = new ConfigDef();
@@ -41,11 +44,18 @@
public List<Map<String, String>> taskConfigs(int maxTasks) {
List<Map<String, String>> taskConfigs = new ArrayList<>();
Map<String, String> taskProps = new HashMap<>();
-
taskProps.putAll(sharedProps);
+ List<String> topics = GeodeConnectorConfig.parseNames(taskProps.get(TOPICS));
+ List<List<String>> topicsPerTask = ConnectorUtils.groupPartitions(topics, maxTasks);
+
+ List<String> regions = GeodeConnectorConfig.parseNames(taskProps.get(REGIONS));
+ List<List<String>> regionsPerTask = ConnectorUtils.groupPartitions(regions, maxTasks);
+
for (int i = 0; i < maxTasks; i++) {
taskProps.put(GeodeConnectorConfig.TASK_ID, "" + i);
+ taskProps.put(TOPICS, GeodeConnectorConfig.reconstructString(topicsPerTask.get(i)));
+ taskProps.put(REGIONS, GeodeConnectorConfig.reconstructString(regionsPerTask.get(i)));
taskConfigs.add(taskProps);
}
diff --git a/src/main/java/geode/kafka/source/GeodeKafkaSource.java b/src/main/java/geode/kafka/source/GeodeKafkaSource.java
index 8f60471..ca3f2c8 100644
--- a/src/main/java/geode/kafka/source/GeodeKafkaSource.java
+++ b/src/main/java/geode/kafka/source/GeodeKafkaSource.java
@@ -5,6 +5,7 @@
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.source.SourceConnector;
+import org.apache.kafka.connect.util.ConnectorUtils;
import java.util.ArrayList;
import java.util.HashMap;
@@ -25,6 +26,8 @@
import static geode.kafka.GeodeConnectorConfig.LOAD_ENTIRE_REGION;
import static geode.kafka.GeodeConnectorConfig.LOCATORS;
import static geode.kafka.GeodeConnectorConfig.QUEUE_SIZE;
+import static geode.kafka.GeodeConnectorConfig.REGIONS;
+import static geode.kafka.GeodeConnectorConfig.TOPICS;
public class GeodeKafkaSource extends SourceConnector {
@@ -45,9 +48,16 @@
Map<String, String> taskProps = new HashMap<>();
taskProps.putAll(sharedProps);
+ List<String> topics = GeodeConnectorConfig.parseNames(taskProps.get(TOPICS));
+ List<List<String>> topicsPerTask = ConnectorUtils.groupPartitions(topics, maxTasks);
+
+ List<String> regions = GeodeConnectorConfig.parseNames(taskProps.get(REGIONS));
+ List<List<String>> regionsPerTask = ConnectorUtils.groupPartitions(regions, maxTasks);
+
for (int i = 0; i < maxTasks; i++) {
- //TODO partition regions and topics
taskProps.put(GeodeConnectorConfig.TASK_ID, "" + i);
+ taskProps.put(TOPICS, GeodeConnectorConfig.reconstructString(topicsPerTask.get(i)));
+ taskProps.put(REGIONS, GeodeConnectorConfig.reconstructString(regionsPerTask.get(i)));
taskConfigs.add(taskProps);
}
return taskConfigs;
diff --git a/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java b/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java
index 657b274..8080179 100644
--- a/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java
+++ b/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java
@@ -21,6 +21,7 @@
import static geode.kafka.GeodeConnectorConfig.BATCH_SIZE;
import static geode.kafka.GeodeConnectorConfig.CQ_PREFIX;
+import static geode.kafka.GeodeConnectorConfig.DEFAULT_BATCH_SIZE;
import static geode.kafka.GeodeConnectorConfig.LOAD_ENTIRE_REGION;
import static geode.kafka.GeodeConnectorConfig.QUEUE_SIZE;
import static geode.kafka.GeodeConnectorConfig.REGION_NAME;
@@ -39,7 +40,7 @@
private GeodeConnectorConfig geodeConnectorConfig;
private List<String> topics;
private Map<String, Map<String, String>> sourcePartitions;
- private static BlockingQueue<GeodeEvent> eventBuffer;
+ private BlockingQueue<GeodeEvent> eventBuffer;
private int batchSize;
@@ -54,6 +55,12 @@
return null;
}
+ void startForTesting(BlockingQueue eventBuffer, List<String> topics, int batchSize) {
+ this.eventBuffer = eventBuffer;
+ this.topics = topics;
+ this.batchSize = batchSize;
+ }
+
@Override
public void start(Map<String, String> props) {
try {
@@ -111,7 +118,7 @@
}
}
- void installListenersToRegion(GeodeContext geodeContext, int taskId, BlockingQueue<GeodeEvent> eventBuffer, String regionName, String cqPrefix, boolean loadEntireRegion, boolean isDurable) {
+ GeodeKafkaSourceListener installListenersToRegion(GeodeContext geodeContext, int taskId, BlockingQueue<GeodeEvent> eventBuffer, String regionName, String cqPrefix, boolean loadEntireRegion, boolean isDurable) {
CqAttributesFactory cqAttributesFactory = new CqAttributesFactory();
GeodeKafkaSourceListener listener = new GeodeKafkaSourceListener(eventBuffer, regionName);
cqAttributesFactory.addCqListener(listener);
@@ -129,6 +136,7 @@
finally {
listener.signalInitialResultsLoaded();
}
+ return listener;
}
/**
@@ -148,5 +156,4 @@
String generateCqName(int taskId, String cqPrefix, String regionName) {
return cqPrefix + DOT + TASK_PREFIX + taskId + DOT + regionName;
}
-
}
diff --git a/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java b/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java
index 8f86cbc..041adc8 100644
--- a/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java
+++ b/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java
@@ -3,8 +3,10 @@
import geode.kafka.GeodeConnectorConfig;
import geode.kafka.GeodeContext;
import org.apache.geode.cache.query.CqEvent;
+import org.apache.kafka.connect.source.SourceRecord;
import org.junit.Test;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
@@ -13,8 +15,15 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
+import static geode.kafka.GeodeConnectorConfig.BATCH_SIZE;
import static geode.kafka.GeodeConnectorConfig.DEFAULT_CQ_PREFIX;
+import static geode.kafka.GeodeConnectorConfig.DEFAULT_LOCATOR;
+import static geode.kafka.GeodeConnectorConfig.DURABLE_CLIENT_ID_PREFIX;
+import static geode.kafka.GeodeConnectorConfig.LOCATORS;
+import static geode.kafka.GeodeConnectorConfig.REGIONS;
import static geode.kafka.GeodeConnectorConfig.REGION_NAME;
+import static geode.kafka.GeodeConnectorConfig.TASK_ID;
+import static geode.kafka.GeodeConnectorConfig.TOPICS;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
@@ -24,6 +33,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+
public class GeodeKafkaSourceTaskTest {
@@ -62,16 +72,36 @@
}
@Test
- public void cqListenerOnEventPopulatesEventsBuffer() {}
+ public void cqListenerOnEventPopulatesEventsBuffer() {
+ GeodeContext geodeContext = mock(GeodeContext.class);
+ BlockingQueue<GeodeEvent> eventBuffer = new LinkedBlockingQueue(100);
+ boolean loadEntireRegion = false;
+ boolean isDurable = false;
- @Test
- public void pollReturnsEventsWhenEventBufferHasValues() {
+ when(geodeContext.newCqWithInitialResults(anyString(), anyString(), any(), anyBoolean())).thenReturn(new ArrayList());
+ GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
+ GeodeKafkaSourceListener listener = task.installListenersToRegion(geodeContext, 1, eventBuffer, "testRegion", DEFAULT_CQ_PREFIX, loadEntireRegion, isDurable);
+ listener.onEvent(mock(CqEvent.class));
+ assertEquals(1, eventBuffer.size());
}
@Test
- public void regionsArePassedCorrectlyToTask() {
-
+ public void pollReturnsEventsWhenEventBufferHasValues() throws Exception {
+// BlockingQueue<GeodeEvent> eventBuffer = new LinkedBlockingQueue(100);
+// CqEvent cqEvent = mock(CqEvent.class);
+// when(cqEvent.getNewValue()).thenReturn("New Value");
+// GeodeEvent event = mock(GeodeEvent.class);
+// when(event.getEvent()).thenReturn(cqEvent);
+// eventBuffer.add(event);
+//
+// List<String> topics = new ArrayList<>();
+// topics.add("myTopic");
+//
+// GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
+// task.startForTesting(eventBuffer, topics, 1);
+// List<SourceRecord> records = task.poll();
+// assertEquals(1, records.size());
}
@Test
@@ -94,8 +124,6 @@
assertThat(true, is(sourcePartitions.get("region3").get(REGION_NAME).equals("region3")));
}
-
-
@Test
public void listOfLocatorsShouldBeConfiguredIntoClientCache() {