Removal of unused variables
diff --git a/src/main/java/org/apache/geode/kafka/sink/BatchRecords.java b/src/main/java/org/apache/geode/kafka/sink/BatchRecords.java
index 073e5ea..9d64a03 100644
--- a/src/main/java/org/apache/geode/kafka/sink/BatchRecords.java
+++ b/src/main/java/org/apache/geode/kafka/sink/BatchRecords.java
@@ -72,7 +72,7 @@
       region.putAll(updateMap);
       region.removeAll(removeList);
     } else {
-      logger.info("Unable to locate proxy region: " + region);
+      logger.info("Unable to locate proxy region is null");
     }
   }
 }
diff --git a/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java
index 2d5abe4..8f26c1e 100644
--- a/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java
+++ b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java
@@ -76,7 +76,6 @@
       eventBufferSupplier = new SharedEventBufferSupplier(geodeConnectorConfig.getQueueSize());
 
       regionToTopics = geodeConnectorConfig.getRegionToTopics();
-      geodeConnectorConfig.getCqsToRegister();
       sourcePartitions = createSourcePartitionsMap(regionToTopics.keySet());
 
       String cqPrefix = geodeConnectorConfig.getCqPrefix();
diff --git a/src/test/java/org/apache/geode/kafka/GeodeAsSinkDUnitTest.java b/src/test/java/org/apache/geode/kafka/GeodeAsSinkDUnitTest.java
index 03b07aa..65747e2 100644
--- a/src/test/java/org/apache/geode/kafka/GeodeAsSinkDUnitTest.java
+++ b/src/test/java/org/apache/geode/kafka/GeodeAsSinkDUnitTest.java
@@ -152,7 +152,9 @@
       if (workerAndHerderCluster != null) {
         workerAndHerderCluster.stop();
       }
-      kafkaLocalCluster.stop();
+      if (kafkaLocalCluster != null) {
+        kafkaLocalCluster.stop();
+      }
     }
 
   }
diff --git a/src/test/java/org/apache/geode/kafka/GeodeAsSourceDUnitTest.java b/src/test/java/org/apache/geode/kafka/GeodeAsSourceDUnitTest.java
index 44ef67b..018a9b9 100644
--- a/src/test/java/org/apache/geode/kafka/GeodeAsSourceDUnitTest.java
+++ b/src/test/java/org/apache/geode/kafka/GeodeAsSourceDUnitTest.java
@@ -163,7 +163,9 @@
       if (workerAndHerderCluster != null) {
         workerAndHerderCluster.stop();
       }
-      kafkaLocalCluster.stop();
+      if (kafkaLocalCluster != null) {
+        kafkaLocalCluster.stop();
+      }
     }
   }
 }
diff --git a/src/test/java/org/apache/geode/kafka/GeodeConnectorConfigTest.java b/src/test/java/org/apache/geode/kafka/GeodeConnectorConfigTest.java
index 7c6adef..9e46478 100644
--- a/src/test/java/org/apache/geode/kafka/GeodeConnectorConfigTest.java
+++ b/src/test/java/org/apache/geode/kafka/GeodeConnectorConfigTest.java
@@ -52,7 +52,7 @@
     List<String> regionNames = config.parseStringByComma("region1, region2, region3,region4");
     assertEquals(4, regionNames.size());
     assertThat(true,
-        allOf(is(regionNames instanceof List), is(regionNames.contains("region1")),
+        allOf(is(regionNames.contains("region1")),
             is(regionNames.contains("region2")), is(regionNames.contains("region3")),
             is(regionNames.contains("region4"))));
   }
diff --git a/src/test/java/org/apache/geode/kafka/sink/BatchRecordsTest.java b/src/test/java/org/apache/geode/kafka/sink/BatchRecordsTest.java
index f59ab7b..9471f48 100644
--- a/src/test/java/org/apache/geode/kafka/sink/BatchRecordsTest.java
+++ b/src/test/java/org/apache/geode/kafka/sink/BatchRecordsTest.java
@@ -53,13 +53,12 @@
 
   @Test
   public void updatingARecordShouldNotRemoveFromTheRemoveListIfNullValuesIsNotSet() {
-    boolean nullValuesMeanRemove = false;
     Map updates = mock(Map.class);
     Collection removes = mock(Collection.class);
     when(removes.contains(any())).thenReturn(true);
     BatchRecords records = new BatchRecords(updates, removes);
     SinkRecord sinkRecord = mock(SinkRecord.class);
-    records.addUpdateOperation(sinkRecord, nullValuesMeanRemove);
+    records.addUpdateOperation(sinkRecord, false);
     verify(removes, times(0)).remove(any());
   }
 
diff --git a/src/test/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTaskTest.java b/src/test/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTaskTest.java
index b38a597..c71d7ba 100644
--- a/src/test/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTaskTest.java
+++ b/src/test/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTaskTest.java
@@ -46,9 +46,8 @@
 
   @Test
   public void putRecordsAddsToRegionBatchRecords() {
-    boolean nullMeansRemove = true;
     GeodeKafkaSinkTask task = new GeodeKafkaSinkTask();
-    HashMap<String, String> props = createTestSinkProps(nullMeansRemove);
+    HashMap<String, String> props = createTestSinkProps(true);
 
     SinkRecord topicRecord = mock(SinkRecord.class);
     when(topicRecord.topic()).thenReturn("topic");
@@ -68,14 +67,13 @@
 
     task.put(records, batchRecordsMap);
     assertTrue(batchRecordsMap.containsKey("region"));
-    verify(batchRecords, times(1)).addUpdateOperation(topicRecord, nullMeansRemove);
+    verify(batchRecords, times(1)).addUpdateOperation(topicRecord, true);
   }
 
   @Test
   public void newBatchRecordsAreCreatedIfOneDoesntExist() {
-    boolean nullMeansRemove = true;
     GeodeKafkaSinkTask task = new GeodeKafkaSinkTask();
-    HashMap<String, String> props = createTestSinkProps(nullMeansRemove);
+    HashMap<String, String> props = createTestSinkProps(true);
 
     SinkRecord topicRecord = mock(SinkRecord.class);
     when(topicRecord.topic()).thenReturn("topic");
diff --git a/src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTaskTest.java b/src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTaskTest.java
index a8d6122..07c9f0b 100644
--- a/src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTaskTest.java
+++ b/src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTaskTest.java
@@ -53,8 +53,6 @@
   public void whenLoadingEntireRegionAbleToPutInitialResultsIntoEventBuffer() {
     GeodeContext geodeContext = mock(GeodeContext.class);
     BlockingQueue<GeodeEvent> eventBuffer = new LinkedBlockingQueue(100);
-    boolean loadEntireRegion = true;
-    boolean isDurable = false;
     CqResults fakeInitialResults = new ResultsBag();
     for (int i = 0; i < 10; i++) {
       fakeInitialResults.add(mock(Struct.class));
@@ -64,7 +62,7 @@
         .thenReturn(fakeInitialResults);
     GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
     task.installListenersToRegion(geodeContext, 1, createEventBufferSupplier(eventBuffer),
-        "testRegion", DEFAULT_CQ_PREFIX, loadEntireRegion, isDurable);
+        "testRegion", DEFAULT_CQ_PREFIX, true, false);
     assertEquals(10, eventBuffer.size());
   }
 
@@ -72,8 +70,6 @@
   public void whenNotLoadingEntireRegionShouldNotPutInitialResultsIntoEventBuffer() {
     GeodeContext geodeContext = mock(GeodeContext.class);
     BlockingQueue<GeodeEvent> eventBuffer = new LinkedBlockingQueue(100);
-    boolean loadEntireRegion = false;
-    boolean isDurable = false;
     CqResults fakeInitialResults = new ResultsBag();
     for (int i = 0; i < 10; i++) {
       fakeInitialResults.add(mock(CqEvent.class));
@@ -83,7 +79,7 @@
         .thenReturn(fakeInitialResults);
     GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
     task.installListenersToRegion(geodeContext, 1, createEventBufferSupplier(eventBuffer),
-        "testRegion", DEFAULT_CQ_PREFIX, loadEntireRegion, isDurable);
+        "testRegion", DEFAULT_CQ_PREFIX, false, false);
     assertEquals(0, eventBuffer.size());
   }
 
@@ -91,15 +87,13 @@
   public void cqListenerOnEventPopulatesEventsBuffer() {
     GeodeContext geodeContext = mock(GeodeContext.class);
     BlockingQueue<GeodeEvent> eventBuffer = new LinkedBlockingQueue(100);
-    boolean loadEntireRegion = false;
-    boolean isDurable = false;
 
     when(geodeContext.newCqWithInitialResults(anyString(), anyString(), any(), anyBoolean()))
         .thenReturn(mock(CqResults.class));
     GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
     GeodeKafkaSourceListener listener =
         task.installListenersToRegion(geodeContext, 1, createEventBufferSupplier(eventBuffer),
-            "testRegion", DEFAULT_CQ_PREFIX, loadEntireRegion, isDurable);
+            "testRegion", DEFAULT_CQ_PREFIX, false, false);
 
     listener.onEvent(mock(CqEvent.class));
     assertEquals(1, eventBuffer.size());