end to end tests use unique topic/regions, incase of cyclical/endless looping of events
diff --git a/src/test/java/geode/kafka/GeodeKafkaTestCluster.java b/src/test/java/geode/kafka/GeodeKafkaTestCluster.java
index f0d7f06..ce26354 100644
--- a/src/test/java/geode/kafka/GeodeKafkaTestCluster.java
+++ b/src/test/java/geode/kafka/GeodeKafkaTestCluster.java
@@ -33,14 +33,13 @@
import java.io.IOException;
import java.time.Duration;
-import java.util.ArrayList;
import java.util.Collections;
-import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
public class GeodeKafkaTestCluster {
@@ -49,8 +48,11 @@
public static TemporaryFolder temporaryFolder = new TemporaryFolder();
private static boolean debug = true;
- public static String TEST_TOPICS = "someTopic";
- public static String TEST_REGIONS = "someRegion";
+ public static String TEST_TOPIC = "someTopic";
+ public static String TEST_REGION = "someRegion";
+
+ public static String TEST_TOPIC_FOR_SINK = "someTopicForSink";
+ public static String TEST_REGION_FOR_SINK = "someTopicForSink";
private static ZooKeeperLocalCluster zooKeeperLocalCluster;
private static KafkaLocalCluster kafkaLocalCluster;
@@ -83,7 +85,8 @@
KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181",false,200000,
15000,10, Time.SYSTEM, "myGroup","myMetricType", null);
AdminZkClient adminZkClient = new AdminZkClient(zkClient);
- adminZkClient.deleteTopic(TEST_TOPICS);
+ adminZkClient.deleteTopic(TEST_TOPIC);
+ adminZkClient.deleteTopic(TEST_TOPIC_FOR_SINK);
kafkaLocalCluster.stop();
geodeLocalCluster.stop();
@@ -103,7 +106,9 @@
Properties topicProperties = new Properties();
topicProperties.put("flush.messages", "1");
AdminZkClient adminZkClient = new AdminZkClient(zkClient);
- adminZkClient.createTopic(TEST_TOPICS,1
+ adminZkClient.createTopic(TEST_TOPIC,1
+ ,1, topicProperties, RackAwareMode.Disabled$.MODULE$);
+ adminZkClient.createTopic(TEST_TOPIC_FOR_SINK,1
,1, topicProperties, RackAwareMode.Disabled$.MODULE$);
}
@@ -174,7 +179,7 @@
final Consumer<String, String> consumer =
new KafkaConsumer<>(props);
// Subscribe to the topic.
- consumer.subscribe(Collections.singletonList(TEST_TOPICS));
+ consumer.subscribe(Collections.singletonList(TEST_TOPIC));
return consumer;
}
@@ -196,7 +201,7 @@
@Test
public void endToEndSourceTest() {
ClientCache client = createGeodeClient();
- Region region = client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGIONS);
+ Region region = client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGION);
//right now just verify something makes it end to end
AtomicInteger valueReceived = new AtomicInteger(0);
@@ -204,25 +209,24 @@
region.put("KEY", "VALUE" + System.currentTimeMillis());
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(4));
for (ConsumerRecord<String, String> record: records) {
-// System.out.println("WE consumed a record:" + record);
valueReceived.incrementAndGet();
}
- return valueReceived.get() > 0;
+ return valueReceived.get() == 10;
});
}
@Test
public void endToEndSinkTest() {
ClientCache client = createGeodeClient();
- Region region = client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGIONS);
+ Region region = client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGION_FOR_SINK);
Producer<String, String> producer = createProducer();
for (int i = 0; i < 10; i++) {
- producer.send(new ProducerRecord(TEST_TOPICS, "KEY" + i, "VALUE" + i));
+ producer.send(new ProducerRecord(TEST_TOPIC_FOR_SINK, "KEY" + i, "VALUE" + i));
}
int i = 0;
- await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> assertNotNull(region.get("KEY" + i)));
+ await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> assertEquals(10, region.sizeOnServer()));
}
}
diff --git a/src/test/java/geode/kafka/WorkerAndHerderWrapper.java b/src/test/java/geode/kafka/WorkerAndHerderWrapper.java
index d6fc7a6..a33c135 100644
--- a/src/test/java/geode/kafka/WorkerAndHerderWrapper.java
+++ b/src/test/java/geode/kafka/WorkerAndHerderWrapper.java
@@ -20,8 +20,10 @@
import static geode.kafka.GeodeConnectorConfig.REGIONS;
import static geode.kafka.GeodeConnectorConfig.TOPICS;
-import static geode.kafka.GeodeKafkaTestCluster.TEST_REGIONS;
-import static geode.kafka.GeodeKafkaTestCluster.TEST_TOPICS;
+import static geode.kafka.GeodeKafkaTestCluster.TEST_REGION;
+import static geode.kafka.GeodeKafkaTestCluster.TEST_REGION_FOR_SINK;
+import static geode.kafka.GeodeKafkaTestCluster.TEST_TOPIC;
+import static geode.kafka.GeodeKafkaTestCluster.TEST_TOPIC_FOR_SINK;
public class WorkerAndHerderWrapper {
@@ -57,8 +59,8 @@
sourceProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, GeodeKafkaSource.class.getName());
sourceProps.put(ConnectorConfig.NAME_CONFIG, "geode-kafka-source-connector");
sourceProps.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
- sourceProps.put(REGIONS, TEST_REGIONS);
- sourceProps.put(TOPICS, TEST_TOPICS);
+ sourceProps.put(REGIONS, TEST_REGION);
+ sourceProps.put(TOPICS, TEST_TOPIC);
herder.putConnectorConfig(
sourceProps.get(ConnectorConfig.NAME_CONFIG),
@@ -69,8 +71,8 @@
sinkProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, GeodeKafkaSink.class.getName());
sinkProps.put(ConnectorConfig.NAME_CONFIG, "geode-kafka-sink-connector");
sinkProps.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
- sinkProps.put(REGIONS, TEST_REGIONS);
- sinkProps.put(TOPICS, TEST_TOPICS);
+ sinkProps.put(REGIONS, TEST_REGION_FOR_SINK);
+ sinkProps.put(TOPICS, TEST_TOPIC_FOR_SINK);
herder.putConnectorConfig(
sinkProps.get(ConnectorConfig.NAME_CONFIG),