Converting global variables to local
diff --git a/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java b/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java
index eaf0f66..daf2274 100644
--- a/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java
+++ b/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java
@@ -40,7 +40,6 @@
   private static final Logger logger = LoggerFactory.getLogger(GeodeKafkaSinkTask.class);
 
   private GeodeContext geodeContext;
-  private int taskId;
   private Map<String, List<String>> topicToRegions;
   private Map<String, Region> regionNameToRegion;
   private boolean nullValuesMeansRemove = true;
@@ -73,7 +72,7 @@
 
   void configure(GeodeSinkConnectorConfig geodeConnectorConfig) {
     logger.debug("GeodeKafkaSourceTask id:" + geodeConnectorConfig.getTaskId() + " starting");
-    taskId = geodeConnectorConfig.getTaskId();
+    int taskId = geodeConnectorConfig.getTaskId();
     topicToRegions = geodeConnectorConfig.getTopicToRegions();
     nullValuesMeansRemove = geodeConnectorConfig.getNullValuesMeanRemove();
   }
diff --git a/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java
index 4e5b415..2d5abe4 100644
--- a/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java
+++ b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java
@@ -43,7 +43,6 @@
   private static final Map<String, Long> OFFSET_DEFAULT = createOffset();
 
   private GeodeContext geodeContext;
-  private GeodeSourceConnectorConfig geodeConnectorConfig;
   private EventBufferSupplier eventBufferSupplier;
   private Map<String, List<String>> regionToTopics;
   private Map<String, Map<String, String>> sourcePartitions;
@@ -64,7 +63,7 @@
   @Override
   public void start(Map<String, String> props) {
     try {
-      geodeConnectorConfig = new GeodeSourceConnectorConfig(props);
+      GeodeSourceConnectorConfig geodeConnectorConfig = new GeodeSourceConnectorConfig(props);
       logger.debug("GeodeKafkaSourceTask id:" + geodeConnectorConfig.getTaskId() + " starting");
       geodeContext = new GeodeContext();
       geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(),
diff --git a/src/main/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfig.java b/src/main/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfig.java
index ac70051..a004f23 100644
--- a/src/main/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfig.java
+++ b/src/main/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfig.java
@@ -54,7 +54,6 @@
   public static final String DEFAULT_LOAD_ENTIRE_REGION = "false";
 
   private final String durableClientId;
-  private final String durableClientIdPrefix;
   private final String durableClientTimeout;
   private final String cqPrefix;
   private final boolean loadEntireRegion;
@@ -68,7 +67,7 @@
     super(SOURCE_CONFIG_DEF, connectorProperties);
     cqsToRegister = parseRegionToTopics(getString(CQS_TO_REGISTER)).keySet();
     regionToTopics = parseRegionToTopics(getString(REGION_TO_TOPIC_BINDINGS));
-    durableClientIdPrefix = getString(DURABLE_CLIENT_ID_PREFIX);
+    String durableClientIdPrefix = getString(DURABLE_CLIENT_ID_PREFIX);
     if (isDurable(durableClientIdPrefix)) {
       durableClientId = durableClientIdPrefix + taskId;
     } else {
diff --git a/src/test/java/org/apache/geode/kafka/GeodeAsSinkDUnitTest.java b/src/test/java/org/apache/geode/kafka/GeodeAsSinkDUnitTest.java
index 931de80..d8f1ab5 100644
--- a/src/test/java/org/apache/geode/kafka/GeodeAsSinkDUnitTest.java
+++ b/src/test/java/org/apache/geode/kafka/GeodeAsSinkDUnitTest.java
@@ -42,9 +42,6 @@
   @Rule
   public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(3);
 
-  private static MemberVM locator, server;
-  private static ClientVM client;
-
   @Rule
   public TestName testName = new TestName();
 
@@ -91,12 +88,11 @@
   @Test
   public void whenKafkaProducerProducesEventsThenGeodeMustReceiveTheseEvents() throws Exception {
 
-    locator = clusterStartupRule.startLocatorVM(0, 10334);
+    MemberVM locator = clusterStartupRule.startLocatorVM(0, 10334);
     int locatorPort = locator.getPort();
-    server = clusterStartupRule.startServerVM(1, locatorPort);
-    client =
-        clusterStartupRule
-            .startClientVM(2, client -> client.withLocatorConnection(locatorPort));
+    MemberVM server = clusterStartupRule.startServerVM(1, locatorPort);
+    ClientVM client1 = clusterStartupRule
+        .startClientVM(2, client -> client.withLocatorConnection(locatorPort));
     int NUM_EVENT = 10;
 
     // Set unique names for all the different components
@@ -132,7 +128,7 @@
       workerAndHerderCluster = startWorkerAndHerderCluster(numTask, sourceRegion, sinkRegion,
           sourceTopic, sinkTopic, temporaryFolderForOffset.getRoot().getAbsolutePath(),
           "localhost[" + locatorPort + "]");
-      client.invoke(() -> {
+      client1.invoke(() -> {
         ClusterStartupRule.getClientCache().createClientRegionFactory(ClientRegionShortcut.PROXY)
             .create(sinkRegion);
       });
@@ -143,7 +139,7 @@
         producer.send(new ProducerRecord(sinkTopic, "KEY" + i, "VALUE" + i));
       }
 
-      client.invoke(() -> {
+      client1.invoke(() -> {
         Region region = ClusterStartupRule.getClientCache().getRegion(sinkRegion);
         await().atMost(10, TimeUnit.SECONDS)
             .untilAsserted(() -> assertEquals(10, region.sizeOnServer()));
diff --git a/src/test/java/org/apache/geode/kafka/GeodeAsSourceDUnitTest.java b/src/test/java/org/apache/geode/kafka/GeodeAsSourceDUnitTest.java
index 8b41d3e..7a0f05f 100644
--- a/src/test/java/org/apache/geode/kafka/GeodeAsSourceDUnitTest.java
+++ b/src/test/java/org/apache/geode/kafka/GeodeAsSourceDUnitTest.java
@@ -54,9 +54,6 @@
   @Rule
   public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(3);
 
-  private static MemberVM locator, server;
-  private static ClientVM client;
-
 
   @Rule
   public TestName testName = new TestName();
@@ -103,12 +100,11 @@
 
   @Test
   public void whenDataIsInsertedInGeodeSourceThenKafkaConsumerMustReceiveEvents() throws Exception {
-    locator = clusterStartupRule.startLocatorVM(0, 10334);
+    MemberVM locator = clusterStartupRule.startLocatorVM(0, 10334);
     int locatorPort = locator.getPort();
-    server = clusterStartupRule.startServerVM(1, locatorPort);
-    client =
-        clusterStartupRule
-            .startClientVM(2, client -> client.withLocatorConnection(locatorPort));
+    MemberVM server = clusterStartupRule.startServerVM(1, locatorPort);
+    ClientVM client1 = clusterStartupRule
+        .startClientVM(2, client -> client.withLocatorConnection(locatorPort));
     int NUM_EVENT = 10;
 
     // Set unique names for all the different components
@@ -128,7 +124,7 @@
       ClusterStartupRule.getCache().createRegionFactory(RegionShortcut.PARTITION)
           .create(sinkRegion);
     });
-    client.invoke(() -> {
+    client1.invoke(() -> {
       ClusterStartupRule.getClientCache().createClientRegionFactory(ClientRegionShortcut.PROXY)
           .create(sourceRegion);
     });
@@ -152,7 +148,7 @@
       Consumer<String, String> consumer = createConsumer(sourceTopic);
 
       // Insert data into the Apache Geode source from the client
-      client.invoke(() -> {
+      client1.invoke(() -> {
         Region region = ClusterStartupRule.getClientCache().getRegion(sourceRegion);
         for (int i = 0; i < NUM_EVENT; i++) {
           region.put("KEY" + i, "VALUE" + i);