end to end tests use unique topic/regions, incase of cyclical/endless looping of events
diff --git a/src/test/java/geode/kafka/GeodeKafkaTestCluster.java b/src/test/java/geode/kafka/GeodeKafkaTestCluster.java
index f0d7f06..ce26354 100644
--- a/src/test/java/geode/kafka/GeodeKafkaTestCluster.java
+++ b/src/test/java/geode/kafka/GeodeKafkaTestCluster.java
@@ -33,14 +33,13 @@
 
 import java.io.IOException;
 import java.time.Duration;
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 
 public class GeodeKafkaTestCluster {
@@ -49,8 +48,11 @@
   public static TemporaryFolder temporaryFolder = new TemporaryFolder();
   private static boolean debug = true;
 
-  public static String TEST_TOPICS = "someTopic";
-  public static String TEST_REGIONS = "someRegion";
+  public static String TEST_TOPIC = "someTopic";
+  public static String TEST_REGION = "someRegion";
+
+  public static String TEST_TOPIC_FOR_SINK = "someTopicForSink";
+  public static String TEST_REGION_FOR_SINK = "someTopicForSink";
 
   private static ZooKeeperLocalCluster zooKeeperLocalCluster;
   private static KafkaLocalCluster kafkaLocalCluster;
@@ -83,7 +85,8 @@
     KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181",false,200000,
             15000,10, Time.SYSTEM, "myGroup","myMetricType", null);
     AdminZkClient adminZkClient = new AdminZkClient(zkClient);
-    adminZkClient.deleteTopic(TEST_TOPICS);
+    adminZkClient.deleteTopic(TEST_TOPIC);
+    adminZkClient.deleteTopic(TEST_TOPIC_FOR_SINK);
 
     kafkaLocalCluster.stop();
     geodeLocalCluster.stop();
@@ -103,7 +106,9 @@
     Properties topicProperties = new Properties();
     topicProperties.put("flush.messages", "1");
     AdminZkClient adminZkClient = new AdminZkClient(zkClient);
-    adminZkClient.createTopic(TEST_TOPICS,1
+    adminZkClient.createTopic(TEST_TOPIC,1
+            ,1, topicProperties, RackAwareMode.Disabled$.MODULE$);
+    adminZkClient.createTopic(TEST_TOPIC_FOR_SINK,1
             ,1, topicProperties, RackAwareMode.Disabled$.MODULE$);
   }
 
@@ -174,7 +179,7 @@
       final Consumer<String, String> consumer =
               new KafkaConsumer<>(props);
       // Subscribe to the topic.
-      consumer.subscribe(Collections.singletonList(TEST_TOPICS));
+      consumer.subscribe(Collections.singletonList(TEST_TOPIC));
       return consumer;
   }
 
@@ -196,7 +201,7 @@
   @Test
   public void endToEndSourceTest() {
     ClientCache client = createGeodeClient();
-    Region region = client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGIONS);
+    Region region = client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGION);
 
     //right now just verify something makes it end to end
     AtomicInteger valueReceived = new AtomicInteger(0);
@@ -204,25 +209,24 @@
       region.put("KEY", "VALUE" + System.currentTimeMillis());
       ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(4));
       for (ConsumerRecord<String, String> record: records) {
-//        System.out.println("WE consumed a record:" + record);
         valueReceived.incrementAndGet();
       }
-      return valueReceived.get() > 0;
+      return valueReceived.get() == 10;
     });
   }
 
   @Test
   public void endToEndSinkTest() {
     ClientCache client = createGeodeClient();
-    Region region = client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGIONS);
+    Region region = client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGION_FOR_SINK);
 
     Producer<String, String> producer = createProducer();
     for (int i = 0; i < 10; i++) {
-      producer.send(new ProducerRecord(TEST_TOPICS, "KEY" + i, "VALUE" + i));
+      producer.send(new ProducerRecord(TEST_TOPIC_FOR_SINK, "KEY" + i, "VALUE" + i));
     }
 
     int i = 0;
-    await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> assertNotNull(region.get("KEY" + i)));
+    await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> assertEquals(10, region.sizeOnServer()));
   }
 
 }
diff --git a/src/test/java/geode/kafka/WorkerAndHerderWrapper.java b/src/test/java/geode/kafka/WorkerAndHerderWrapper.java
index d6fc7a6..a33c135 100644
--- a/src/test/java/geode/kafka/WorkerAndHerderWrapper.java
+++ b/src/test/java/geode/kafka/WorkerAndHerderWrapper.java
@@ -20,8 +20,10 @@
 
 import static geode.kafka.GeodeConnectorConfig.REGIONS;
 import static geode.kafka.GeodeConnectorConfig.TOPICS;
-import static geode.kafka.GeodeKafkaTestCluster.TEST_REGIONS;
-import static geode.kafka.GeodeKafkaTestCluster.TEST_TOPICS;
+import static geode.kafka.GeodeKafkaTestCluster.TEST_REGION;
+import static geode.kafka.GeodeKafkaTestCluster.TEST_REGION_FOR_SINK;
+import static geode.kafka.GeodeKafkaTestCluster.TEST_TOPIC;
+import static geode.kafka.GeodeKafkaTestCluster.TEST_TOPIC_FOR_SINK;
 
 public class WorkerAndHerderWrapper {
 
@@ -57,8 +59,8 @@
         sourceProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, GeodeKafkaSource.class.getName());
         sourceProps.put(ConnectorConfig.NAME_CONFIG, "geode-kafka-source-connector");
         sourceProps.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
-        sourceProps.put(REGIONS, TEST_REGIONS);
-        sourceProps.put(TOPICS, TEST_TOPICS);
+        sourceProps.put(REGIONS, TEST_REGION);
+        sourceProps.put(TOPICS, TEST_TOPIC);
 
         herder.putConnectorConfig(
                 sourceProps.get(ConnectorConfig.NAME_CONFIG),
@@ -69,8 +71,8 @@
         sinkProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, GeodeKafkaSink.class.getName());
         sinkProps.put(ConnectorConfig.NAME_CONFIG, "geode-kafka-sink-connector");
         sinkProps.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
-        sinkProps.put(REGIONS, TEST_REGIONS);
-        sinkProps.put(TOPICS, TEST_TOPICS);
+        sinkProps.put(REGIONS, TEST_REGION_FOR_SINK);
+        sinkProps.put(TOPICS, TEST_TOPIC_FOR_SINK);
 
         herder.putConnectorConfig(
                 sinkProps.get(ConnectorConfig.NAME_CONFIG),