Renamed the package to org.apache
diff --git a/src/main/java/org/geode/kafka/GeodeConnectorConfig.java b/src/main/java/org/apache/geode/kafka/GeodeConnectorConfig.java
similarity index 99%
rename from src/main/java/org/geode/kafka/GeodeConnectorConfig.java
rename to src/main/java/org/apache/geode/kafka/GeodeConnectorConfig.java
index 7fbfb55..476c07c 100644
--- a/src/main/java/org/geode/kafka/GeodeConnectorConfig.java
+++ b/src/main/java/org/apache/geode/kafka/GeodeConnectorConfig.java
@@ -12,7 +12,7 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-package org.geode.kafka;
+package org.apache.geode.kafka;
 
 import java.util.Arrays;
 import java.util.Collection;
diff --git a/src/main/java/org/geode/kafka/GeodeContext.java b/src/main/java/org/apache/geode/kafka/GeodeContext.java
similarity index 98%
rename from src/main/java/org/geode/kafka/GeodeContext.java
rename to src/main/java/org/apache/geode/kafka/GeodeContext.java
index 9f30242..02cc85f 100644
--- a/src/main/java/org/geode/kafka/GeodeContext.java
+++ b/src/main/java/org/apache/geode/kafka/GeodeContext.java
@@ -12,7 +12,7 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-package org.geode.kafka;
+package org.apache.geode.kafka;
 
 import static org.geode.kafka.GeodeConnectorConfig.SECURITY_CLIENT_AUTH_INIT;
 import static org.geode.kafka.GeodeConnectorConfig.SECURITY_PASSWORD;
diff --git a/src/main/java/org/geode/kafka/LocatorHostPort.java b/src/main/java/org/apache/geode/kafka/LocatorHostPort.java
similarity index 97%
rename from src/main/java/org/geode/kafka/LocatorHostPort.java
rename to src/main/java/org/apache/geode/kafka/LocatorHostPort.java
index 5c71fa1..d879d8e 100644
--- a/src/main/java/org/geode/kafka/LocatorHostPort.java
+++ b/src/main/java/org/apache/geode/kafka/LocatorHostPort.java
@@ -12,7 +12,7 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-package org.geode.kafka;
+package org.apache.geode.kafka;
 
 public class LocatorHostPort {
 
diff --git a/src/main/java/org/geode/kafka/security/SystemPropertyAuthInit.java b/src/main/java/org/apache/geode/kafka/security/SystemPropertyAuthInit.java
similarity index 97%
rename from src/main/java/org/geode/kafka/security/SystemPropertyAuthInit.java
rename to src/main/java/org/apache/geode/kafka/security/SystemPropertyAuthInit.java
index 6b646ee..4f3e414 100644
--- a/src/main/java/org/geode/kafka/security/SystemPropertyAuthInit.java
+++ b/src/main/java/org/apache/geode/kafka/security/SystemPropertyAuthInit.java
@@ -12,7 +12,7 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-package org.geode.kafka.security;
+package org.apache.geode.kafka.security;
 
 import java.util.Properties;
 
diff --git a/src/main/java/org/geode/kafka/sink/BatchRecords.java b/src/main/java/org/apache/geode/kafka/sink/BatchRecords.java
similarity index 98%
rename from src/main/java/org/geode/kafka/sink/BatchRecords.java
rename to src/main/java/org/apache/geode/kafka/sink/BatchRecords.java
index 049abac..45a93d6 100644
--- a/src/main/java/org/geode/kafka/sink/BatchRecords.java
+++ b/src/main/java/org/apache/geode/kafka/sink/BatchRecords.java
@@ -12,7 +12,7 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-package org.geode.kafka.sink;
+package org.apache.geode.kafka.sink;
 
 import java.util.ArrayList;
 import java.util.Collection;
diff --git a/src/main/java/org/geode/kafka/sink/GeodeKafkaSink.java b/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSink.java
similarity index 91%
rename from src/main/java/org/geode/kafka/sink/GeodeKafkaSink.java
rename to src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSink.java
index 9ee5189..e4754f7 100644
--- a/src/main/java/org/geode/kafka/sink/GeodeKafkaSink.java
+++ b/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSink.java
@@ -12,9 +12,7 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-package org.geode.kafka.sink;
-
-import static org.geode.kafka.sink.GeodeSinkConnectorConfig.SINK_CONFIG_DEF;
+package org.apache.geode.kafka.sink;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -24,7 +22,7 @@
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.sink.SinkConnector;
-import org.geode.kafka.GeodeConnectorConfig;
+import org.apache.geode.kafka.GeodeConnectorConfig;
 
 public class GeodeKafkaSink extends SinkConnector {
   private Map<String, String> sharedProps;
@@ -62,7 +60,7 @@
 
   @Override
   public ConfigDef config() {
-    return SINK_CONFIG_DEF;
+    return GeodeSinkConnectorConfig.SINK_CONFIG_DEF;
   }
 
   @Override
diff --git a/src/main/java/org/geode/kafka/sink/GeodeKafkaSinkTask.java b/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java
similarity index 98%
rename from src/main/java/org/geode/kafka/sink/GeodeKafkaSinkTask.java
rename to src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java
index 7c77c7f..023284e 100644
--- a/src/main/java/org/geode/kafka/sink/GeodeKafkaSinkTask.java
+++ b/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java
@@ -12,7 +12,7 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-package org.geode.kafka.sink;
+package org.apache.geode.kafka.sink;
 
 import java.util.Collection;
 import java.util.HashMap;
@@ -22,7 +22,7 @@
 
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.apache.kafka.connect.sink.SinkTask;
-import org.geode.kafka.GeodeContext;
+import org.apache.geode.kafka.GeodeContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/src/main/java/org/geode/kafka/sink/GeodeSinkConnectorConfig.java b/src/main/java/org/apache/geode/kafka/sink/GeodeSinkConnectorConfig.java
similarity index 96%
rename from src/main/java/org/geode/kafka/sink/GeodeSinkConnectorConfig.java
rename to src/main/java/org/apache/geode/kafka/sink/GeodeSinkConnectorConfig.java
index a074220..47eb165 100644
--- a/src/main/java/org/geode/kafka/sink/GeodeSinkConnectorConfig.java
+++ b/src/main/java/org/apache/geode/kafka/sink/GeodeSinkConnectorConfig.java
@@ -12,13 +12,13 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-package org.geode.kafka.sink;
+package org.apache.geode.kafka.sink;
 
 import java.util.List;
 import java.util.Map;
 
 import org.apache.kafka.common.config.ConfigDef;
-import org.geode.kafka.GeodeConnectorConfig;
+import org.apache.geode.kafka.GeodeConnectorConfig;
 
 public class GeodeSinkConnectorConfig extends GeodeConnectorConfig {
   public static final ConfigDef SINK_CONFIG_DEF = configurables();
diff --git a/src/main/java/org/geode/kafka/source/EventBufferSupplier.java b/src/main/java/org/apache/geode/kafka/source/EventBufferSupplier.java
similarity index 95%
rename from src/main/java/org/geode/kafka/source/EventBufferSupplier.java
rename to src/main/java/org/apache/geode/kafka/source/EventBufferSupplier.java
index be40602..843c305 100644
--- a/src/main/java/org/geode/kafka/source/EventBufferSupplier.java
+++ b/src/main/java/org/apache/geode/kafka/source/EventBufferSupplier.java
@@ -12,7 +12,7 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-package org.geode.kafka.source;
+package org.apache.geode.kafka.source;
 
 import java.util.concurrent.BlockingQueue;
 import java.util.function.Supplier;
diff --git a/src/main/java/org/geode/kafka/source/GeodeEvent.java b/src/main/java/org/apache/geode/kafka/source/GeodeEvent.java
similarity index 97%
rename from src/main/java/org/geode/kafka/source/GeodeEvent.java
rename to src/main/java/org/apache/geode/kafka/source/GeodeEvent.java
index 5b51d07..654b05a 100644
--- a/src/main/java/org/geode/kafka/source/GeodeEvent.java
+++ b/src/main/java/org/apache/geode/kafka/source/GeodeEvent.java
@@ -12,7 +12,7 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-package org.geode.kafka.source;
+package org.apache.geode.kafka.source;
 
 import org.apache.geode.cache.query.CqEvent;
 
diff --git a/src/main/java/org/geode/kafka/source/GeodeKafkaSource.java b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSource.java
similarity index 93%
rename from src/main/java/org/geode/kafka/source/GeodeKafkaSource.java
rename to src/main/java/org/apache/geode/kafka/source/GeodeKafkaSource.java
index 7b4445e..e8bfaf1 100644
--- a/src/main/java/org/geode/kafka/source/GeodeKafkaSource.java
+++ b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSource.java
@@ -12,9 +12,9 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-package org.geode.kafka.source;
+package org.apache.geode.kafka.source;
 
-import static org.geode.kafka.source.GeodeSourceConnectorConfig.SOURCE_CONFIG_DEF;
+import static org.apache.geode.kafka.source.GeodeSourceConnectorConfig.SOURCE_CONFIG_DEF;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -26,7 +26,7 @@
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.source.SourceConnector;
 import org.apache.kafka.connect.util.ConnectorUtils;
-import org.geode.kafka.GeodeConnectorConfig;
+import org.apache.geode.kafka.GeodeConnectorConfig;
 
 
 public class GeodeKafkaSource extends SourceConnector {
diff --git a/src/main/java/org/geode/kafka/source/GeodeKafkaSourceListener.java b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceListener.java
similarity index 98%
rename from src/main/java/org/geode/kafka/source/GeodeKafkaSourceListener.java
rename to src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceListener.java
index e875ee4..1d16404 100644
--- a/src/main/java/org/geode/kafka/source/GeodeKafkaSourceListener.java
+++ b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceListener.java
@@ -12,7 +12,7 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-package org.geode.kafka.source;
+package org.apache.geode.kafka.source;
 
 import java.util.concurrent.TimeUnit;
 
diff --git a/src/main/java/org/geode/kafka/source/GeodeKafkaSourceTask.java b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java
similarity index 98%
rename from src/main/java/org/geode/kafka/source/GeodeKafkaSourceTask.java
rename to src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java
index 4acc081..58de173 100644
--- a/src/main/java/org/geode/kafka/source/GeodeKafkaSourceTask.java
+++ b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java
@@ -12,7 +12,7 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-package org.geode.kafka.source;
+package org.apache.geode.kafka.source;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -23,7 +23,7 @@
 
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.source.SourceTask;
-import org.geode.kafka.GeodeContext;
+import org.apache.geode.kafka.GeodeContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/src/main/java/org/geode/kafka/source/GeodeSourceConnectorConfig.java b/src/main/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfig.java
similarity index 98%
rename from src/main/java/org/geode/kafka/source/GeodeSourceConnectorConfig.java
rename to src/main/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfig.java
index e96796b..339551a 100644
--- a/src/main/java/org/geode/kafka/source/GeodeSourceConnectorConfig.java
+++ b/src/main/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfig.java
@@ -12,14 +12,14 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-package org.geode.kafka.source;
+package org.apache.geode.kafka.source;
 
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.kafka.common.config.ConfigDef;
-import org.geode.kafka.GeodeConnectorConfig;
+import org.apache.geode.kafka.GeodeConnectorConfig;
 
 public class GeodeSourceConnectorConfig extends GeodeConnectorConfig {
 
diff --git a/src/main/java/org/geode/kafka/source/SharedEventBufferSupplier.java b/src/main/java/org/apache/geode/kafka/source/SharedEventBufferSupplier.java
similarity index 97%
rename from src/main/java/org/geode/kafka/source/SharedEventBufferSupplier.java
rename to src/main/java/org/apache/geode/kafka/source/SharedEventBufferSupplier.java
index 963a132..b3d1268 100644
--- a/src/main/java/org/geode/kafka/source/SharedEventBufferSupplier.java
+++ b/src/main/java/org/apache/geode/kafka/source/SharedEventBufferSupplier.java
@@ -12,7 +12,7 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-package org.geode.kafka.source;
+package org.apache.geode.kafka.source;
 
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
diff --git a/src/test/java/org/geode/kafka/GeodeAsSinkDUnitTest.java b/src/test/java/org/apache/geode/kafka/GeodeAsSinkDUnitTest.java
similarity index 85%
rename from src/test/java/org/geode/kafka/GeodeAsSinkDUnitTest.java
rename to src/test/java/org/apache/geode/kafka/GeodeAsSinkDUnitTest.java
index b0a11e6..5cd3618 100644
--- a/src/test/java/org/geode/kafka/GeodeAsSinkDUnitTest.java
+++ b/src/test/java/org/apache/geode/kafka/GeodeAsSinkDUnitTest.java
@@ -1,14 +1,14 @@
-package org.geode.kafka;
+package org.apache.geode.kafka;
 
 import static org.awaitility.Awaitility.await;
-import static org.geode.kafka.GeodeKafkaTestUtils.createProducer;
-import static org.geode.kafka.GeodeKafkaTestUtils.createTopic;
-import static org.geode.kafka.GeodeKafkaTestUtils.deleteTopic;
-import static org.geode.kafka.GeodeKafkaTestUtils.getKafkaConfig;
-import static org.geode.kafka.GeodeKafkaTestUtils.getZooKeeperProperties;
-import static org.geode.kafka.GeodeKafkaTestUtils.startKafka;
-import static org.geode.kafka.GeodeKafkaTestUtils.startWorkerAndHerderCluster;
-import static org.geode.kafka.GeodeKafkaTestUtils.startZooKeeper;
+import static org.apache.geode.kafka.utilities.GeodeKafkaTestUtils.createProducer;
+import static org.apache.geode.kafka.utilities.GeodeKafkaTestUtils.createTopic;
+import static org.apache.geode.kafka.utilities.GeodeKafkaTestUtils.deleteTopic;
+import static org.apache.geode.kafka.utilities.GeodeKafkaTestUtils.getKafkaConfig;
+import static org.apache.geode.kafka.utilities.GeodeKafkaTestUtils.getZooKeeperProperties;
+import static org.apache.geode.kafka.utilities.GeodeKafkaTestUtils.startKafka;
+import static org.apache.geode.kafka.utilities.GeodeKafkaTestUtils.startWorkerAndHerderCluster;
+import static org.apache.geode.kafka.utilities.GeodeKafkaTestUtils.startZooKeeper;
 import static org.junit.Assert.assertEquals;
 
 import java.util.Arrays;
@@ -31,6 +31,8 @@
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionShortcut;
 import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.kafka.utilities.KafkaLocalCluster;
+import org.apache.geode.kafka.utilities.WorkerAndHerderCluster;
 import org.apache.geode.test.dunit.rules.ClientVM;
 import org.apache.geode.test.dunit.rules.ClusterStartupRule;
 import org.apache.geode.test.dunit.rules.MemberVM;
diff --git a/src/test/java/org/geode/kafka/GeodeAsSourceDUnitTest.java b/src/test/java/org/apache/geode/kafka/GeodeAsSourceDUnitTest.java
similarity index 85%
rename from src/test/java/org/geode/kafka/GeodeAsSourceDUnitTest.java
rename to src/test/java/org/apache/geode/kafka/GeodeAsSourceDUnitTest.java
index d300b55..08aede2 100644
--- a/src/test/java/org/geode/kafka/GeodeAsSourceDUnitTest.java
+++ b/src/test/java/org/apache/geode/kafka/GeodeAsSourceDUnitTest.java
@@ -12,17 +12,17 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-package org.geode.kafka;
+package org.apache.geode.kafka;
 
-import static org.geode.kafka.GeodeKafkaTestUtils.createConsumer;
-import static org.geode.kafka.GeodeKafkaTestUtils.createTopic;
-import static org.geode.kafka.GeodeKafkaTestUtils.deleteTopic;
-import static org.geode.kafka.GeodeKafkaTestUtils.getKafkaConfig;
-import static org.geode.kafka.GeodeKafkaTestUtils.getZooKeeperProperties;
-import static org.geode.kafka.GeodeKafkaTestUtils.startKafka;
-import static org.geode.kafka.GeodeKafkaTestUtils.startWorkerAndHerderCluster;
-import static org.geode.kafka.GeodeKafkaTestUtils.startZooKeeper;
-import static org.geode.kafka.GeodeKafkaTestUtils.verifyEventsAreConsumed;
+import static org.apache.geode.kafka.utilities.GeodeKafkaTestUtils.createConsumer;
+import static org.apache.geode.kafka.utilities.GeodeKafkaTestUtils.createTopic;
+import static org.apache.geode.kafka.utilities.GeodeKafkaTestUtils.deleteTopic;
+import static org.apache.geode.kafka.utilities.GeodeKafkaTestUtils.getKafkaConfig;
+import static org.apache.geode.kafka.utilities.GeodeKafkaTestUtils.getZooKeeperProperties;
+import static org.apache.geode.kafka.utilities.GeodeKafkaTestUtils.startKafka;
+import static org.apache.geode.kafka.utilities.GeodeKafkaTestUtils.startWorkerAndHerderCluster;
+import static org.apache.geode.kafka.utilities.GeodeKafkaTestUtils.startZooKeeper;
+import static org.apache.geode.kafka.utilities.GeodeKafkaTestUtils.verifyEventsAreConsumed;
 
 import java.util.Arrays;
 
@@ -43,6 +43,9 @@
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionShortcut;
 import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.kafka.utilities.GeodeKafkaTestUtils;
+import org.apache.geode.kafka.utilities.KafkaLocalCluster;
+import org.apache.geode.kafka.utilities.WorkerAndHerderCluster;
 import org.apache.geode.test.dunit.rules.ClientVM;
 import org.apache.geode.test.dunit.rules.ClusterStartupRule;
 import org.apache.geode.test.dunit.rules.MemberVM;
diff --git a/src/test/java/org/geode/kafka/GeodeConnectorConfigTest.java b/src/test/java/org/apache/geode/kafka/GeodeConnectorConfigTest.java
similarity index 95%
rename from src/test/java/org/geode/kafka/GeodeConnectorConfigTest.java
rename to src/test/java/org/apache/geode/kafka/GeodeConnectorConfigTest.java
index 5c63d98..db7d921 100644
--- a/src/test/java/org/geode/kafka/GeodeConnectorConfigTest.java
+++ b/src/test/java/org/apache/geode/kafka/GeodeConnectorConfigTest.java
@@ -12,11 +12,9 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-package org.geode.kafka;
+package org.apache.geode.kafka;
 
 
-import static org.geode.kafka.GeodeConnectorConfig.SECURITY_CLIENT_AUTH_INIT;
-import static org.geode.kafka.GeodeConnectorConfig.SECURITY_USER;
 import static org.hamcrest.CoreMatchers.allOf;
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertEquals;
@@ -137,7 +135,7 @@
   @Test
   public void usesSecurityShouldBeTrueIfSecurityUserSet() {
     Map<String, String> props = new HashMap<>();
-    props.put(SECURITY_USER, "some user");
+    props.put(GeodeConnectorConfig.SECURITY_USER, "some user");
     GeodeConnectorConfig config =
         new GeodeConnectorConfig(GeodeConnectorConfig.configurables(), props);
     assertTrue(config.usesSecurity());
@@ -146,7 +144,7 @@
   @Test
   public void usesSecurityShouldBeTrueIfSecurityClientAuthInitSet() {
     Map<String, String> props = new HashMap<>();
-    props.put(SECURITY_CLIENT_AUTH_INIT, "someclass");
+    props.put(GeodeConnectorConfig.SECURITY_CLIENT_AUTH_INIT, "someclass");
     GeodeConnectorConfig config =
         new GeodeConnectorConfig(GeodeConnectorConfig.configurables(), props);
     assertTrue(config.usesSecurity());
@@ -163,7 +161,7 @@
   @Test
   public void securityClientAuthInitShouldBeSetIfUserIsSet() {
     Map<String, String> props = new HashMap<>();
-    props.put(SECURITY_USER, "some user");
+    props.put(GeodeConnectorConfig.SECURITY_USER, "some user");
     GeodeConnectorConfig config =
         new GeodeConnectorConfig(GeodeConnectorConfig.configurables(), props);
     assertNotNull(config.getSecurityClientAuthInit());
diff --git a/src/test/java/org/geode/kafka/sink/BatchRecordsTest.java b/src/test/java/org/apache/geode/kafka/sink/BatchRecordsTest.java
similarity index 98%
rename from src/test/java/org/geode/kafka/sink/BatchRecordsTest.java
rename to src/test/java/org/apache/geode/kafka/sink/BatchRecordsTest.java
index c2da554..f59ab7b 100644
--- a/src/test/java/org/geode/kafka/sink/BatchRecordsTest.java
+++ b/src/test/java/org/apache/geode/kafka/sink/BatchRecordsTest.java
@@ -12,7 +12,7 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-package org.geode.kafka.sink;
+package org.apache.geode.kafka.sink;
 
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
diff --git a/src/test/java/org/geode/kafka/sink/GeodeKafkaSinkTaskTest.java b/src/test/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTaskTest.java
similarity index 97%
rename from src/test/java/org/geode/kafka/sink/GeodeKafkaSinkTaskTest.java
rename to src/test/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTaskTest.java
index 32925fb..dd325f2 100644
--- a/src/test/java/org/geode/kafka/sink/GeodeKafkaSinkTaskTest.java
+++ b/src/test/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTaskTest.java
@@ -12,7 +12,7 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-package org.geode.kafka.sink;
+package org.apache.geode.kafka.sink;
 
 import static org.geode.kafka.sink.GeodeSinkConnectorConfig.NULL_VALUES_MEAN_REMOVE;
 import static org.geode.kafka.sink.GeodeSinkConnectorConfig.TOPIC_TO_REGION_BINDINGS;
@@ -28,7 +28,7 @@
 import java.util.List;
 
 import org.apache.kafka.connect.sink.SinkRecord;
-import org.geode.kafka.GeodeConnectorConfig;
+import org.apache.geode.kafka.GeodeConnectorConfig;
 import org.junit.Test;
 
 import org.apache.geode.cache.Region;
diff --git a/src/test/java/org/geode/kafka/sink/GeodeKafkaSinkTest.java b/src/test/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTest.java
similarity index 92%
rename from src/test/java/org/geode/kafka/sink/GeodeKafkaSinkTest.java
rename to src/test/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTest.java
index 28e7033..ea5cf1e 100644
--- a/src/test/java/org/geode/kafka/sink/GeodeKafkaSinkTest.java
+++ b/src/test/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTest.java
@@ -12,9 +12,9 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-package org.geode.kafka.sink;
+package org.apache.geode.kafka.sink;
 
-import static org.geode.kafka.sink.GeodeSinkConnectorConfig.TOPIC_TO_REGION_BINDINGS;
+import static org.apache.geode.kafka.sink.GeodeSinkConnectorConfig.TOPIC_TO_REGION_BINDINGS;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -23,7 +23,7 @@
 import java.util.HashSet;
 import java.util.Map;
 
-import org.geode.kafka.GeodeConnectorConfig;
+import org.apache.geode.kafka.GeodeConnectorConfig;
 import org.junit.Test;
 
 public class GeodeKafkaSinkTest {
diff --git a/src/test/java/org/geode/kafka/source/GeodeKafkaSourceTaskTest.java b/src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTaskTest.java
similarity index 97%
rename from src/test/java/org/geode/kafka/source/GeodeKafkaSourceTaskTest.java
rename to src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTaskTest.java
index 4fa7d81..5125a91 100644
--- a/src/test/java/org/geode/kafka/source/GeodeKafkaSourceTaskTest.java
+++ b/src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTaskTest.java
@@ -12,10 +12,10 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-package org.geode.kafka.source;
+package org.apache.geode.kafka.source;
 
-import static org.geode.kafka.source.GeodeSourceConnectorConfig.DEFAULT_CQ_PREFIX;
-import static org.geode.kafka.source.GeodeSourceConnectorConfig.REGION_PARTITION;
+import static org.apache.geode.kafka.source.GeodeSourceConnectorConfig.DEFAULT_CQ_PREFIX;
+import static org.apache.geode.kafka.source.GeodeSourceConnectorConfig.REGION_PARTITION;
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
@@ -35,7 +35,7 @@
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
-import org.geode.kafka.GeodeContext;
+import org.apache.geode.kafka.GeodeContext;
 import org.junit.Test;
 
 import org.apache.geode.cache.client.ClientCache;
diff --git a/src/test/java/org/geode/kafka/source/GeodeKafkaSourceTest.java b/src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTest.java
similarity index 92%
rename from src/test/java/org/geode/kafka/source/GeodeKafkaSourceTest.java
rename to src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTest.java
index 433550a..786fccf 100644
--- a/src/test/java/org/geode/kafka/source/GeodeKafkaSourceTest.java
+++ b/src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTest.java
@@ -12,9 +12,9 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-package org.geode.kafka.source;
+package org.apache.geode.kafka.source;
 
-import static org.geode.kafka.source.GeodeSourceConnectorConfig.REGION_TO_TOPIC_BINDINGS;
+import static org.apache.geode.kafka.source.GeodeSourceConnectorConfig.REGION_TO_TOPIC_BINDINGS;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -23,7 +23,7 @@
 import java.util.HashSet;
 import java.util.Map;
 
-import org.geode.kafka.GeodeConnectorConfig;
+import org.apache.geode.kafka.GeodeConnectorConfig;
 import org.junit.Test;
 
 public class GeodeKafkaSourceTest {
diff --git a/src/test/java/org/geode/kafka/source/GeodeSourceConnectorConfigTest.java b/src/test/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfigTest.java
similarity index 87%
rename from src/test/java/org/geode/kafka/source/GeodeSourceConnectorConfigTest.java
rename to src/test/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfigTest.java
index fdcd7d3..641a2ac 100644
--- a/src/test/java/org/geode/kafka/source/GeodeSourceConnectorConfigTest.java
+++ b/src/test/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfigTest.java
@@ -12,15 +12,15 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-package org.geode.kafka.source;
+package org.apache.geode.kafka.source;
 
-import static org.geode.kafka.source.GeodeSourceConnectorConfig.DURABLE_CLIENT_ID_PREFIX;
+import static org.apache.geode.kafka.source.GeodeSourceConnectorConfig.DURABLE_CLIENT_ID_PREFIX;
 import static org.junit.Assert.assertEquals;
 
 import java.util.HashMap;
 import java.util.Map;
 
-import org.geode.kafka.GeodeConnectorConfig;
+import org.apache.geode.kafka.GeodeConnectorConfig;
 import org.junit.Test;
 
 public class GeodeSourceConnectorConfigTest {
diff --git a/src/test/java/org/geode/kafka/source/SharedEventBufferSupplierTest.java b/src/test/java/org/apache/geode/kafka/source/SharedEventBufferSupplierTest.java
similarity index 98%
rename from src/test/java/org/geode/kafka/source/SharedEventBufferSupplierTest.java
rename to src/test/java/org/apache/geode/kafka/source/SharedEventBufferSupplierTest.java
index 92de30d..2683a61 100644
--- a/src/test/java/org/geode/kafka/source/SharedEventBufferSupplierTest.java
+++ b/src/test/java/org/apache/geode/kafka/source/SharedEventBufferSupplierTest.java
@@ -12,7 +12,7 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-package org.geode.kafka.source;
+package org.apache.geode.kafka.source;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
diff --git a/src/test/java/org/geode/kafka/GeodeKafkaTestUtils.java b/src/test/java/org/apache/geode/kafka/utilities/GeodeKafkaTestUtils.java
similarity index 82%
rename from src/test/java/org/geode/kafka/GeodeKafkaTestUtils.java
rename to src/test/java/org/apache/geode/kafka/utilities/GeodeKafkaTestUtils.java
index abe0d6a..c8e1bb0 100644
--- a/src/test/java/org/geode/kafka/GeodeKafkaTestUtils.java
+++ b/src/test/java/org/apache/geode/kafka/utilities/GeodeKafkaTestUtils.java
@@ -12,7 +12,7 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-package org.geode.kafka;
+package org.apache.geode.kafka.utilities;
 
 import static org.awaitility.Awaitility.await;
 
@@ -42,21 +42,21 @@
 import org.junit.rules.TemporaryFolder;
 
 public class GeodeKafkaTestUtils {
-  protected static ZooKeeperLocalCluster startZooKeeper(Properties zookeeperProperties)
+  public static ZooKeeperLocalCluster startZooKeeper(Properties zookeeperProperties)
       throws IOException, QuorumPeerConfig.ConfigException {
     ZooKeeperLocalCluster zooKeeperLocalCluster = new ZooKeeperLocalCluster(zookeeperProperties);
     zooKeeperLocalCluster.start();
     return zooKeeperLocalCluster;
   }
 
-  protected static KafkaLocalCluster startKafka(Properties kafkaProperties)
+  public static KafkaLocalCluster startKafka(Properties kafkaProperties)
       throws IOException, InterruptedException {
     KafkaLocalCluster kafkaLocalCluster = new KafkaLocalCluster(kafkaProperties);
     kafkaLocalCluster.start();
     return kafkaLocalCluster;
   }
 
-  protected static void createTopic(String topicName, int numPartitions, int replicationFactor) {
+  public static void createTopic(String topicName, int numPartitions, int replicationFactor) {
     KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181", false, 200000,
         15000, 10, Time.SYSTEM, "myGroup", "myMetricType", null);
 
@@ -67,14 +67,14 @@
         RackAwareMode.Disabled$.MODULE$);
   }
 
-  protected static void deleteTopic(String topicName) {
+  public static void deleteTopic(String topicName) {
     KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181", false, 200000,
         15000, 10, Time.SYSTEM, "myGroup", "myMetricType", null);
     AdminZkClient adminZkClient = new AdminZkClient(zkClient);
     adminZkClient.deleteTopic(topicName);
   }
 
-  protected static Producer<String, String> createProducer() {
+  public static Producer<String, String> createProducer() {
     final Properties props = new Properties();
     props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
@@ -88,7 +88,7 @@
     return producer;
   }
 
-  protected static Properties getZooKeeperProperties(TemporaryFolder temporaryFolder)
+  public static Properties getZooKeeperProperties(TemporaryFolder temporaryFolder)
       throws IOException {
     Properties properties = new Properties();
     properties.setProperty("dataDir", temporaryFolder.newFolder("zookeeper").getAbsolutePath());
@@ -97,7 +97,7 @@
     return properties;
   }
 
-  protected static Properties getKafkaConfig(String logPath) {
+  public static Properties getKafkaConfig(String logPath) {
     int BROKER_PORT = 9092;
     Properties props = new Properties();
     props.put("broker.id", "0");
@@ -129,9 +129,13 @@
     return consumer;
   }
 
-  protected static WorkerAndHerderCluster startWorkerAndHerderCluster(int maxTasks,
-      String sourceRegion, String sinkRegion, String sourceTopic, String sinkTopic,
-      String offsetPath, String locatorString) {
+  public static WorkerAndHerderCluster startWorkerAndHerderCluster(int maxTasks,
+                                                                   String sourceRegion,
+                                                                   String sinkRegion,
+                                                                   String sourceTopic,
+                                                                   String sinkTopic,
+                                                                   String offsetPath,
+                                                                   String locatorString) {
     WorkerAndHerderCluster workerAndHerderCluster = new WorkerAndHerderCluster();
     try {
       workerAndHerderCluster.start(String.valueOf(maxTasks), sourceRegion, sinkRegion, sourceTopic,
@@ -143,7 +147,7 @@
     return workerAndHerderCluster;
   }
 
-  protected static void verifyEventsAreConsumed(Consumer<String, String> consumer, int numEvents) {
+  public static void verifyEventsAreConsumed(Consumer<String, String> consumer, int numEvents) {
     AtomicInteger valueReceived = new AtomicInteger(0);
     await().atMost(10, TimeUnit.SECONDS).until(() -> {
       ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2));
diff --git a/src/test/java/org/geode/kafka/JavaProcess.java b/src/test/java/org/apache/geode/kafka/utilities/JavaProcess.java
similarity index 97%
rename from src/test/java/org/geode/kafka/JavaProcess.java
rename to src/test/java/org/apache/geode/kafka/utilities/JavaProcess.java
index a88638b..c289c80 100644
--- a/src/test/java/org/geode/kafka/JavaProcess.java
+++ b/src/test/java/org/apache/geode/kafka/utilities/JavaProcess.java
@@ -12,7 +12,7 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-package org.geode.kafka;
+package org.apache.geode.kafka.utilities;
 
 import java.io.File;
 import java.io.IOException;
diff --git a/src/test/java/org/geode/kafka/KafkaLocalCluster.java b/src/test/java/org/apache/geode/kafka/utilities/KafkaLocalCluster.java
similarity index 96%
rename from src/test/java/org/geode/kafka/KafkaLocalCluster.java
rename to src/test/java/org/apache/geode/kafka/utilities/KafkaLocalCluster.java
index ee13f8c..338e819 100644
--- a/src/test/java/org/geode/kafka/KafkaLocalCluster.java
+++ b/src/test/java/org/apache/geode/kafka/utilities/KafkaLocalCluster.java
@@ -12,7 +12,7 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-package org.geode.kafka;
+package org.apache.geode.kafka.utilities;
 
 import java.io.IOException;
 import java.util.Properties;
diff --git a/src/test/java/org/geode/kafka/WorkerAndHerderCluster.java b/src/test/java/org/apache/geode/kafka/utilities/WorkerAndHerderCluster.java
similarity index 97%
rename from src/test/java/org/geode/kafka/WorkerAndHerderCluster.java
rename to src/test/java/org/apache/geode/kafka/utilities/WorkerAndHerderCluster.java
index 022e381..7c58bc0 100644
--- a/src/test/java/org/geode/kafka/WorkerAndHerderCluster.java
+++ b/src/test/java/org/apache/geode/kafka/utilities/WorkerAndHerderCluster.java
@@ -12,7 +12,7 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-package org.geode.kafka;
+package org.apache.geode.kafka.utilities;
 
 import java.io.IOException;
 
diff --git a/src/test/java/org/geode/kafka/WorkerAndHerderWrapper.java b/src/test/java/org/apache/geode/kafka/utilities/WorkerAndHerderWrapper.java
similarity index 78%
rename from src/test/java/org/geode/kafka/WorkerAndHerderWrapper.java
rename to src/test/java/org/apache/geode/kafka/utilities/WorkerAndHerderWrapper.java
index a3efc23..8003fc0 100644
--- a/src/test/java/org/geode/kafka/WorkerAndHerderWrapper.java
+++ b/src/test/java/org/apache/geode/kafka/utilities/WorkerAndHerderWrapper.java
@@ -12,10 +12,9 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-package org.geode.kafka;
+package org.apache.geode.kafka.utilities;
 
-import static org.geode.kafka.sink.GeodeSinkConnectorConfig.TOPIC_TO_REGION_BINDINGS;
-import static org.geode.kafka.source.GeodeSourceConnectorConfig.REGION_TO_TOPIC_BINDINGS;
+import static org.apache.geode.kafka.source.GeodeSourceConnectorConfig.REGION_TO_TOPIC_BINDINGS;
 
 import java.io.IOException;
 import java.util.HashMap;
@@ -32,29 +31,27 @@
 import org.apache.kafka.connect.runtime.standalone.StandaloneHerder;
 import org.apache.kafka.connect.storage.MemoryOffsetBackingStore;
 import org.apache.kafka.connect.util.ConnectUtils;
-import org.geode.kafka.sink.GeodeKafkaSink;
-import org.geode.kafka.source.GeodeKafkaSource;
+
+import org.apache.geode.kafka.GeodeConnectorConfig;
+import org.apache.geode.kafka.sink.GeodeKafkaSink;
+import org.apache.geode.kafka.source.GeodeKafkaSource;
+import org.apache.geode.kafka.sink.GeodeSinkConnectorConfig;
 
 public class WorkerAndHerderWrapper {
 
   public static void main(String[] args) throws IOException {
-    String maxTasks = args[0];
-    String offsetPath = "/tmp/connect.offsets";
-    String regionToTopicBinding = GeodeKafkaTestCluster.TEST_REGION_TO_TOPIC_BINDINGS;
-    String topicToRegionBinding = GeodeKafkaTestCluster.TEST_TOPIC_TO_REGION_BINDINGS;
-    String sinkTopic = GeodeKafkaTestCluster.TEST_TOPIC_FOR_SINK;
-    String locatorString = null;
-    System.out.println("MaxTask " + maxTasks);
-    if (args.length == 7) {
-      String sourceRegion = args[1];
-      String sinkRegion = args[2];
-      String sourceTopic = args[3];
-      sinkTopic = args[4];
-      offsetPath = args[5];
-      regionToTopicBinding = "[" + sourceRegion + ":" + sourceTopic + "]";
-      topicToRegionBinding = "[" + sinkTopic + ":" + sinkRegion + "]";
-      locatorString = args[6];
+    if (args.length != 7) {
+      throw new RuntimeException("Insufficient arguments to start workers and herders");
     }
+    String maxTasks = args[0];
+    String sourceRegion = args[1];
+    String sinkRegion = args[2];
+    String sourceTopic = args[3];
+    String sinkTopic = args[4];
+    String offsetPath = args[5];
+    String regionToTopicBinding = "[" + sourceRegion + ":" + sourceTopic + "]";
+    String topicToRegionBinding = "[" + sinkTopic + ":" + sinkRegion + "]";
+    String locatorString = args[6];
 
     Map props = new HashMap();
     props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
@@ -100,7 +97,7 @@
     sinkProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, GeodeKafkaSink.class.getName());
     sinkProps.put(ConnectorConfig.NAME_CONFIG, "geode-kafka-sink-connector");
     sinkProps.put(ConnectorConfig.TASKS_MAX_CONFIG, maxTasks);
-    sinkProps.put(TOPIC_TO_REGION_BINDINGS, topicToRegionBinding);
+    sinkProps.put(GeodeSinkConnectorConfig.TOPIC_TO_REGION_BINDINGS, topicToRegionBinding);
     sinkProps.put(GeodeConnectorConfig.LOCATORS, locatorString);
     sinkProps.put("topics", sinkTopic);
 
diff --git a/src/test/java/org/geode/kafka/ZooKeeperLocalCluster.java b/src/test/java/org/apache/geode/kafka/utilities/ZooKeeperLocalCluster.java
similarity index 97%
rename from src/test/java/org/geode/kafka/ZooKeeperLocalCluster.java
rename to src/test/java/org/apache/geode/kafka/utilities/ZooKeeperLocalCluster.java
index d7cb99a..8e5e7a9 100644
--- a/src/test/java/org/geode/kafka/ZooKeeperLocalCluster.java
+++ b/src/test/java/org/apache/geode/kafka/utilities/ZooKeeperLocalCluster.java
@@ -12,7 +12,7 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-package org.geode.kafka;
+package org.apache.geode.kafka.utilities;
 
 import java.io.IOException;
 import java.util.Properties;
diff --git a/src/test/java/org/geode/kafka/GeodeContextTest.java b/src/test/java/org/geode/kafka/GeodeContextTest.java
deleted file mode 100644
index eb10bee..0000000
--- a/src/test/java/org/geode/kafka/GeodeContextTest.java
+++ /dev/null
@@ -1,18 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.geode.kafka;
-
-public class GeodeContextTest {
-}
diff --git a/src/test/java/org/geode/kafka/GeodeKafkaTestCluster.java b/src/test/java/org/geode/kafka/GeodeKafkaTestCluster.java
deleted file mode 100644
index 57d576d..0000000
--- a/src/test/java/org/geode/kafka/GeodeKafkaTestCluster.java
+++ /dev/null
@@ -1,412 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.geode.kafka;
-
-import static org.awaitility.Awaitility.await;
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.time.Duration;
-import java.util.Collections;
-import java.util.Properties;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import kafka.admin.RackAwareMode;
-import kafka.zk.AdminZkClient;
-import kafka.zk.KafkaZkClient;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.connect.runtime.WorkerConfig;
-import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import org.apache.geode.cache.Region;
-import org.apache.geode.cache.client.ClientCache;
-import org.apache.geode.cache.client.ClientCacheFactory;
-import org.apache.geode.cache.client.ClientRegionShortcut;
-
-public class GeodeKafkaTestCluster {
-
-  @ClassRule
-  public static TemporaryFolder temporaryFolder = new TemporaryFolder();
-  private static boolean debug = true;
-
-  public static String TEST_REGION_TO_TOPIC_BINDINGS = "[someRegionForSource:someTopicForSource]";
-  public static String TEST_TOPIC_TO_REGION_BINDINGS = "[someTopicForSink:someRegionForSink]";
-
-  public static String TEST_TOPIC_FOR_SOURCE = "someTopicForSource";
-  public static String TEST_REGION_FOR_SOURCE = "someRegionForSource";
-  public static String TEST_TOPIC_FOR_SINK = "someTopicForSink";
-  public static String TEST_REGION_FOR_SINK = "someRegionForSink";
-
-  private static ZooKeeperLocalCluster zooKeeperLocalCluster;
-  private static KafkaLocalCluster kafkaLocalCluster;
-  private static GeodeLocalCluster geodeLocalCluster;
-  private static WorkerAndHerderCluster workerAndHerderCluster;
-  private static Consumer<String, String> consumer;
-
-  @BeforeClass
-  public static void setup()
-      throws IOException, QuorumPeerConfig.ConfigException, InterruptedException {
-    startZooKeeper();
-    startKafka();
-    startGeode();
-  }
-
-
-  @AfterClass
-  public static void shutdown() {
-    workerAndHerderCluster.stop();
-    KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181", false, 200000,
-        15000, 10, Time.SYSTEM, "myGroup", "myMetricType", null);
-    // AdminZkClient adminZkClient = new AdminZkClient(zkClient);
-    // adminZkClient.deleteTopic(TEST_TOPIC_FOR_SOURCE);
-    // adminZkClient.deleteTopic(TEST_TOPIC_FOR_SINK);
-    zkClient.close();
-    kafkaLocalCluster.stop();
-    geodeLocalCluster.stop();
-  }
-
-
-  private static void startWorker(int maxTasks) throws IOException, InterruptedException {
-    workerAndHerderCluster = new WorkerAndHerderCluster();
-    workerAndHerderCluster.start(String.valueOf(maxTasks));
-    Thread.sleep(20000);
-  }
-
-  private static void createTopic(String topicName, int numPartitions, int replicationFactor) {
-    KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181", false, 200000,
-        15000, 10, Time.SYSTEM, "myGroup", "myMetricType", null);
-
-    Properties topicProperties = new Properties();
-    topicProperties.put("flush.messages", "1");
-    AdminZkClient adminZkClient = new AdminZkClient(zkClient);
-    adminZkClient.createTopic(topicName, numPartitions, replicationFactor, topicProperties,
-        RackAwareMode.Disabled$.MODULE$);
-  }
-
-  private static void deleteTopic(String topicName) {
-    KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181", false, 200000,
-        15000, 10, Time.SYSTEM, "myGroup", "myMetricType", null);
-    AdminZkClient adminZkClient = new AdminZkClient(zkClient);
-    adminZkClient.deleteTopic(topicName);
-  }
-
-  private ClientCache createGeodeClient() {
-    return new ClientCacheFactory().addPoolLocator("localhost", 10334).create();
-  }
-
-  private static void startZooKeeper() throws IOException, QuorumPeerConfig.ConfigException {
-    zooKeeperLocalCluster = new ZooKeeperLocalCluster(getZooKeeperProperties());
-    zooKeeperLocalCluster.start();
-  }
-
-  private static void startKafka()
-      throws IOException, InterruptedException, QuorumPeerConfig.ConfigException {
-    kafkaLocalCluster = new KafkaLocalCluster(getKafkaConfig());
-    kafkaLocalCluster.start();
-  }
-
-  private static void startGeode() throws IOException, InterruptedException {
-    geodeLocalCluster = new GeodeLocalCluster();
-    geodeLocalCluster.start();
-  }
-
-  private static Properties getZooKeeperProperties() throws IOException {
-    Properties properties = new Properties();
-    properties.setProperty("dataDir",
-        (debug) ? "/tmp/zookeeper" : temporaryFolder.newFolder("zookeeper").getAbsolutePath());
-    properties.setProperty("clientPort", "2181");
-    properties.setProperty("tickTime", "2000");
-    return properties;
-  }
-
-
-  private static Properties getKafkaConfig() throws IOException {
-    int BROKER_PORT = 9092;
-    Properties props = new Properties();
-
-    props.put("broker.id", "0");
-    props.put("zookeeper.connect", "localhost:2181");
-    props.put("host.name", "localHost");
-    props.put("port", BROKER_PORT);
-    props.put("offsets.topic.replication.factor", "1");
-
-    // Specifically GeodeKafka connector configs
-    return props;
-  }
-
-
-  // consumer props, less important, just for testing?
-  public static Consumer<String, String> createConsumer() {
-    final Properties props = new Properties();
-    props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-    props.put(ConsumerConfig.GROUP_ID_CONFIG,
-        "myGroup");
-    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
-        StringDeserializer.class.getName());
-    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
-        StringDeserializer.class.getName());
-    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-    // Create the consumer using props.
-    final Consumer<String, String> consumer =
-        new KafkaConsumer<>(props);
-    // Subscribe to the topic.
-    consumer.subscribe(Collections.singletonList(TEST_TOPIC_FOR_SOURCE));
-    return consumer;
-  }
-
-  // consumer props, less important, just for testing?
-  public static Producer<String, String> createProducer() {
-    final Properties props = new Properties();
-    props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
-        StringSerializer.class.getName());
-    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
-        StringSerializer.class.getName());
-
-    // Create the producer using props.
-    final Producer<String, String> producer =
-        new KafkaProducer<>(props);
-    return producer;
-  }
-
-  @Test
-  public void endToEndSourceTest() throws Exception {
-    try {
-      createTopic(TEST_TOPIC_FOR_SOURCE, 1, 1);
-      startWorker(1);
-      consumer = createConsumer();
-
-      ClientCache client = createGeodeClient();
-      Region region = client.createClientRegionFactory(ClientRegionShortcut.PROXY)
-          .create(TEST_REGION_FOR_SOURCE);
-
-      for (int i = 0; i < 10; i++) {
-        region.put("KEY" + i, "VALUE" + i);
-      }
-
-      AtomicInteger valueReceived = new AtomicInteger(0);
-      await().atMost(10, TimeUnit.SECONDS).until(() -> {
-        ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2));
-        for (ConsumerRecord<String, String> record : records) {
-          valueReceived.incrementAndGet();
-        }
-        return valueReceived.get() == 10;
-      });
-    } finally {
-      deleteTopic(TEST_TOPIC_FOR_SOURCE);
-    }
-  }
-
-
-  @Test
-  public void endToEndSourceSingleRegionMultiTaskMultiPartitionTest() throws Exception {
-    try {
-      createTopic(TEST_TOPIC_FOR_SOURCE, 2, 1);
-      startWorker(1);
-      consumer = createConsumer();
-
-      ClientCache client = createGeodeClient();
-      Region region = client.createClientRegionFactory(ClientRegionShortcut.PROXY)
-          .create(TEST_REGION_FOR_SOURCE);
-
-      for (int i = 0; i < 10; i++) {
-        region.put("KEY" + i, "VALUE" + i);
-      }
-
-      AtomicInteger valueReceived = new AtomicInteger(0);
-      await().atMost(10, TimeUnit.SECONDS).until(() -> {
-        ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2));
-        for (ConsumerRecord<String, String> record : records) {
-          valueReceived.incrementAndGet();
-        }
-        return valueReceived.get() == 10;
-      });
-    } finally {
-      deleteTopic(TEST_TOPIC_FOR_SOURCE);
-    }
-  }
-
-  @Test
-  public void endToEndSourceSingleRegionMultiTaskMultiPartitionWithMoreTasksThanPartitionsTest()
-      throws Exception {
-    try {
-      createTopic(TEST_TOPIC_FOR_SOURCE, 2, 1);
-      startWorker(5);
-      consumer = createConsumer();
-
-      ClientCache client = createGeodeClient();
-      Region region = client.createClientRegionFactory(ClientRegionShortcut.PROXY)
-          .create(TEST_REGION_FOR_SOURCE);
-
-      for (int i = 0; i < 10; i++) {
-        region.put("KEY" + i, "VALUE" + i);
-      }
-
-      AtomicInteger valueReceived = new AtomicInteger(0);
-      await().atMost(10, TimeUnit.SECONDS).until(() -> {
-        ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2));
-        for (ConsumerRecord<String, String> record : records) {
-          valueReceived.incrementAndGet();
-        }
-        return valueReceived.get() == 10;
-      });
-    } finally {
-      deleteTopic(TEST_TOPIC_FOR_SOURCE);
-    }
-  }
-
-  @Test
-  public void endToEndSinkTest() throws Exception {
-    try {
-      createTopic(TEST_TOPIC_FOR_SINK, 1, 1);
-      startWorker(1);
-      consumer = createConsumer();
-
-      ClientCache client = createGeodeClient();
-      Region region =
-          client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGION_FOR_SINK);
-
-      Producer<String, String> producer = createProducer();
-      for (int i = 0; i < 10; i++) {
-        producer.send(new ProducerRecord(TEST_TOPIC_FOR_SINK, "KEY" + i, "VALUE" + i));
-      }
-
-      int i = 0;
-      await().atMost(10, TimeUnit.SECONDS)
-          .untilAsserted(() -> assertEquals(10, region.sizeOnServer()));
-    } finally {
-      deleteTopic(TEST_TOPIC_FOR_SINK);
-    }
-  }
-
-
-  @Test
-  public void endToEndWithOneTaskForASingleBindingAgainstAMultiPartitionTopicSinkTest()
-      throws Exception {
-    try {
-      createTopic(TEST_TOPIC_FOR_SINK, 10, 1);
-      startWorker(5);
-      consumer = createConsumer();
-
-      ClientCache client = createGeodeClient();
-      Region region =
-          client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGION_FOR_SINK);
-
-      Producer<String, String> producer = createProducer();
-      for (int i = 0; i < 10; i++) {
-        producer.send(new ProducerRecord(TEST_TOPIC_FOR_SINK, "KEY" + i, "VALUE" + i));
-      }
-
-      int i = 0;
-      await().atMost(10, TimeUnit.SECONDS)
-          .untilAsserted(() -> assertEquals(10, region.sizeOnServer()));
-    } finally {
-      deleteTopic(TEST_TOPIC_FOR_SINK);
-    }
-  }
-
-  @Test
-  public void endToEndWithOneTaskForASingleBindingAgainstAMultiPartitionTopicWithMoreWorkersSinkTest()
-      throws Exception {
-    try {
-      createTopic(TEST_TOPIC_FOR_SINK, 10, 1);
-      startWorker(15);
-      consumer = createConsumer();
-
-      ClientCache client = createGeodeClient();
-      Region region =
-          client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGION_FOR_SINK);
-
-      Producer<String, String> producer = createProducer();
-      for (int i = 0; i < 10; i++) {
-        producer.send(new ProducerRecord(TEST_TOPIC_FOR_SINK, "KEY" + i, "VALUE" + i));
-      }
-
-      int i = 0;
-      await().atMost(10, TimeUnit.SECONDS)
-          .untilAsserted(() -> assertEquals(10, region.sizeOnServer()));
-    } finally {
-      deleteTopic(TEST_TOPIC_FOR_SINK);
-    }
-  }
-
-  @Test
-  public void endToEndWithOneTaskForASingleBindingLessTasksThanPartitions() throws Exception {
-    try {
-      createTopic(TEST_TOPIC_FOR_SINK, 10, 1);
-      startWorker(5);
-      consumer = createConsumer();
-
-      ClientCache client = createGeodeClient();
-      Region region =
-          client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGION_FOR_SINK);
-
-      Producer<String, String> producer = createProducer();
-      for (int i = 0; i < 10; i++) {
-        producer.send(new ProducerRecord(TEST_TOPIC_FOR_SINK, "KEY" + i, "VALUE" + i));
-      }
-
-      int i = 0;
-      await().atMost(10, TimeUnit.SECONDS)
-          .untilAsserted(() -> assertEquals(10, region.sizeOnServer()));
-    } finally {
-      deleteTopic(TEST_TOPIC_FOR_SINK);
-    }
-  }
-
-  @Test
-  public void endToEndWithOneTaskForASingleBindingMoreTasksThanPartitions() throws Exception {
-    try {
-      createTopic(TEST_TOPIC_FOR_SINK, 10, 1);
-      startWorker(5);
-      consumer = createConsumer();
-
-      ClientCache client = createGeodeClient();
-      Region region =
-          client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGION_FOR_SINK);
-
-      Producer<String, String> producer = createProducer();
-      for (int i = 0; i < 10; i++) {
-        producer.send(new ProducerRecord(TEST_TOPIC_FOR_SINK, i, UUID.randomUUID().toString(),
-            UUID.randomUUID().toString()));
-      }
-
-      int i = 0;
-      await().atMost(10, TimeUnit.SECONDS)
-          .untilAsserted(() -> assertEquals(10, region.sizeOnServer()));
-    } finally {
-      deleteTopic(TEST_TOPIC_FOR_SINK);
-    }
-  }
-
-}
diff --git a/src/test/java/org/geode/kafka/GeodeLocalCluster.java b/src/test/java/org/geode/kafka/GeodeLocalCluster.java
deleted file mode 100644
index 6784391..0000000
--- a/src/test/java/org/geode/kafka/GeodeLocalCluster.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.geode.kafka;
-
-import java.io.IOException;
-
-public class GeodeLocalCluster {
-
-  private JavaProcess locatorProcess;
-  private JavaProcess serverProcess;
-
-  public GeodeLocalCluster() {
-    locatorProcess = new JavaProcess(LocatorLauncherWrapper.class);
-    serverProcess = new JavaProcess(ServerLauncherWrapper.class);
-  }
-
-  public void start() throws IOException, InterruptedException {
-    System.out.println("starting locator");
-    locatorProcess.exec("10334");
-    Thread.sleep(15000);
-    serverProcess.exec("40404");
-    Thread.sleep(30000);
-  }
-
-  public void stop() {
-    serverProcess.destroy();
-    locatorProcess.destroy();
-  }
-}
diff --git a/src/test/java/org/geode/kafka/LocatorLauncherWrapper.java b/src/test/java/org/geode/kafka/LocatorLauncherWrapper.java
deleted file mode 100644
index f24367c..0000000
--- a/src/test/java/org/geode/kafka/LocatorLauncherWrapper.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.geode.kafka;
-
-import java.io.IOException;
-import java.util.Properties;
-
-import org.apache.geode.distributed.ConfigurationProperties;
-import org.apache.geode.distributed.Locator;
-
-public class LocatorLauncherWrapper {
-
-  public static void main(String[] args) throws IOException {
-    Properties properties = new Properties();
-    // String statsFile = new File(context.getOutputDir(), "stats.gfs").getAbsolutePath();
-    // properties.setProperty(ConfigurationPropert/**/ies.STATISTIC_ARCHIVE_FILE, statsFile);
-    properties.setProperty(ConfigurationProperties.NAME, "locator1");
-
-    Locator.startLocatorAndDS(10334,
-        null/* new File("/Users/jhuynh/Pivotal/geode-kafka-connector/locator.log") */, properties);
-    while (true) {
-
-    }
-    //
-    // LocatorLauncher locatorLauncher = new LocatorLauncher.Builder()
-    // .setMemberName("locator1")
-    //// .setPort(Integer.valueOf(args[0]))
-    //// .setBindAddress("localhost")
-    // .build();
-    //
-    // locatorLauncher.start();
-    // while (!locatorLauncher.isRunning()) {
-    //
-    // }
-    // System.out.println(locatorLauncher.getBindAddress() + ":" + locatorLauncher.getPort());
-
-  }
-}
diff --git a/src/test/java/org/geode/kafka/ServerLauncherWrapper.java b/src/test/java/org/geode/kafka/ServerLauncherWrapper.java
deleted file mode 100644
index 4ab75cd..0000000
--- a/src/test/java/org/geode/kafka/ServerLauncherWrapper.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.geode.kafka;
-
-import java.io.IOException;
-import java.util.Properties;
-
-import org.apache.geode.cache.Cache;
-import org.apache.geode.cache.CacheFactory;
-import org.apache.geode.cache.RegionShortcut;
-import org.apache.geode.cache.server.CacheServer;
-import org.apache.geode.distributed.ConfigurationProperties;
-
-public class ServerLauncherWrapper {
-
-  public static void main(String... args) throws IOException {
-    // ServerLauncher serverLauncher = new ServerLauncher.Builder()
-    // .setMemberName("server1")
-    //// .setServerPort(Integer.valueOf(args[0]))
-    //// .setServerBindAddress("localhost")
-    // // .set("locators", "localhost[10334]")
-    //// .set("jmx-manager", "true")
-    //// .set("jmx-manager-start", "true")
-    // .build();
-    //
-    // serverLauncher.start();
-    // System.out.println("Geode Server Launcher complete");
-
-
-
-    Properties properties = new Properties();
-    String locatorString = "localhost[10334]";
-    // String statsFile = new File(context.getOutputDir(), "stats.gfs").getAbsolutePath();
-    Cache cache = new CacheFactory(properties)
-        // .setPdxSerializer(new ReflectionBasedAutoSerializer("benchmark.geode.data.*"))
-        .set(ConfigurationProperties.LOCATORS, locatorString)
-        .set(ConfigurationProperties.NAME,
-            "server-1")
-        // .set(ConfigurationProperties.LOG_FILE,
-        // "/Users/jhuynh/Pivotal/geode-kafka-connector/server.log")
-        .set(ConfigurationProperties.LOG_LEVEL, "info")
-        // .set(ConfigurationProperties.STATISTIC_ARCHIVE_FILE, statsFile)
-        .create();
-    CacheServer cacheServer = cache.addCacheServer();
-    cacheServer.setPort(0);
-    cacheServer.start();
-
-    // create the region
-    cache.createRegionFactory(RegionShortcut.PARTITION).create(
-        GeodeKafkaTestCluster.TEST_REGION_FOR_SINK);
-    cache.createRegionFactory(RegionShortcut.PARTITION).create(
-        GeodeKafkaTestCluster.TEST_REGION_FOR_SOURCE);
-    System.out.println("starting cacheserver");
-    while (true) {
-
-    }
-  }
-}