Removal of unused variables
diff --git a/src/main/java/org/apache/geode/kafka/sink/BatchRecords.java b/src/main/java/org/apache/geode/kafka/sink/BatchRecords.java
index 073e5ea..9d64a03 100644
--- a/src/main/java/org/apache/geode/kafka/sink/BatchRecords.java
+++ b/src/main/java/org/apache/geode/kafka/sink/BatchRecords.java
@@ -72,7 +72,7 @@
region.putAll(updateMap);
region.removeAll(removeList);
} else {
- logger.info("Unable to locate proxy region: " + region);
+ logger.info("Unable to locate proxy region is null");
}
}
}
diff --git a/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java
index 2d5abe4..8f26c1e 100644
--- a/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java
+++ b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java
@@ -76,7 +76,6 @@
eventBufferSupplier = new SharedEventBufferSupplier(geodeConnectorConfig.getQueueSize());
regionToTopics = geodeConnectorConfig.getRegionToTopics();
- geodeConnectorConfig.getCqsToRegister();
sourcePartitions = createSourcePartitionsMap(regionToTopics.keySet());
String cqPrefix = geodeConnectorConfig.getCqPrefix();
diff --git a/src/test/java/org/apache/geode/kafka/GeodeAsSinkDUnitTest.java b/src/test/java/org/apache/geode/kafka/GeodeAsSinkDUnitTest.java
index 03b07aa..65747e2 100644
--- a/src/test/java/org/apache/geode/kafka/GeodeAsSinkDUnitTest.java
+++ b/src/test/java/org/apache/geode/kafka/GeodeAsSinkDUnitTest.java
@@ -152,7 +152,9 @@
if (workerAndHerderCluster != null) {
workerAndHerderCluster.stop();
}
- kafkaLocalCluster.stop();
+ if (kafkaLocalCluster != null) {
+ kafkaLocalCluster.stop();
+ }
}
}
diff --git a/src/test/java/org/apache/geode/kafka/GeodeAsSourceDUnitTest.java b/src/test/java/org/apache/geode/kafka/GeodeAsSourceDUnitTest.java
index 44ef67b..018a9b9 100644
--- a/src/test/java/org/apache/geode/kafka/GeodeAsSourceDUnitTest.java
+++ b/src/test/java/org/apache/geode/kafka/GeodeAsSourceDUnitTest.java
@@ -163,7 +163,9 @@
if (workerAndHerderCluster != null) {
workerAndHerderCluster.stop();
}
- kafkaLocalCluster.stop();
+ if (kafkaLocalCluster != null) {
+ kafkaLocalCluster.stop();
+ }
}
}
}
diff --git a/src/test/java/org/apache/geode/kafka/GeodeConnectorConfigTest.java b/src/test/java/org/apache/geode/kafka/GeodeConnectorConfigTest.java
index 7c6adef..9e46478 100644
--- a/src/test/java/org/apache/geode/kafka/GeodeConnectorConfigTest.java
+++ b/src/test/java/org/apache/geode/kafka/GeodeConnectorConfigTest.java
@@ -52,7 +52,7 @@
List<String> regionNames = config.parseStringByComma("region1, region2, region3,region4");
assertEquals(4, regionNames.size());
assertThat(true,
- allOf(is(regionNames instanceof List), is(regionNames.contains("region1")),
+ allOf(is(regionNames.contains("region1")),
is(regionNames.contains("region2")), is(regionNames.contains("region3")),
is(regionNames.contains("region4"))));
}
diff --git a/src/test/java/org/apache/geode/kafka/sink/BatchRecordsTest.java b/src/test/java/org/apache/geode/kafka/sink/BatchRecordsTest.java
index f59ab7b..9471f48 100644
--- a/src/test/java/org/apache/geode/kafka/sink/BatchRecordsTest.java
+++ b/src/test/java/org/apache/geode/kafka/sink/BatchRecordsTest.java
@@ -53,13 +53,12 @@
@Test
public void updatingARecordShouldNotRemoveFromTheRemoveListIfNullValuesIsNotSet() {
- boolean nullValuesMeanRemove = false;
Map updates = mock(Map.class);
Collection removes = mock(Collection.class);
when(removes.contains(any())).thenReturn(true);
BatchRecords records = new BatchRecords(updates, removes);
SinkRecord sinkRecord = mock(SinkRecord.class);
- records.addUpdateOperation(sinkRecord, nullValuesMeanRemove);
+ records.addUpdateOperation(sinkRecord, false);
verify(removes, times(0)).remove(any());
}
diff --git a/src/test/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTaskTest.java b/src/test/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTaskTest.java
index b38a597..c71d7ba 100644
--- a/src/test/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTaskTest.java
+++ b/src/test/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTaskTest.java
@@ -46,9 +46,8 @@
@Test
public void putRecordsAddsToRegionBatchRecords() {
- boolean nullMeansRemove = true;
GeodeKafkaSinkTask task = new GeodeKafkaSinkTask();
- HashMap<String, String> props = createTestSinkProps(nullMeansRemove);
+ HashMap<String, String> props = createTestSinkProps(true);
SinkRecord topicRecord = mock(SinkRecord.class);
when(topicRecord.topic()).thenReturn("topic");
@@ -68,14 +67,13 @@
task.put(records, batchRecordsMap);
assertTrue(batchRecordsMap.containsKey("region"));
- verify(batchRecords, times(1)).addUpdateOperation(topicRecord, nullMeansRemove);
+ verify(batchRecords, times(1)).addUpdateOperation(topicRecord, true);
}
@Test
public void newBatchRecordsAreCreatedIfOneDoesntExist() {
- boolean nullMeansRemove = true;
GeodeKafkaSinkTask task = new GeodeKafkaSinkTask();
- HashMap<String, String> props = createTestSinkProps(nullMeansRemove);
+ HashMap<String, String> props = createTestSinkProps(true);
SinkRecord topicRecord = mock(SinkRecord.class);
when(topicRecord.topic()).thenReturn("topic");
diff --git a/src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTaskTest.java b/src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTaskTest.java
index a8d6122..07c9f0b 100644
--- a/src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTaskTest.java
+++ b/src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTaskTest.java
@@ -53,8 +53,6 @@
public void whenLoadingEntireRegionAbleToPutInitialResultsIntoEventBuffer() {
GeodeContext geodeContext = mock(GeodeContext.class);
BlockingQueue<GeodeEvent> eventBuffer = new LinkedBlockingQueue(100);
- boolean loadEntireRegion = true;
- boolean isDurable = false;
CqResults fakeInitialResults = new ResultsBag();
for (int i = 0; i < 10; i++) {
fakeInitialResults.add(mock(Struct.class));
@@ -64,7 +62,7 @@
.thenReturn(fakeInitialResults);
GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
task.installListenersToRegion(geodeContext, 1, createEventBufferSupplier(eventBuffer),
- "testRegion", DEFAULT_CQ_PREFIX, loadEntireRegion, isDurable);
+ "testRegion", DEFAULT_CQ_PREFIX, true, false);
assertEquals(10, eventBuffer.size());
}
@@ -72,8 +70,6 @@
public void whenNotLoadingEntireRegionShouldNotPutInitialResultsIntoEventBuffer() {
GeodeContext geodeContext = mock(GeodeContext.class);
BlockingQueue<GeodeEvent> eventBuffer = new LinkedBlockingQueue(100);
- boolean loadEntireRegion = false;
- boolean isDurable = false;
CqResults fakeInitialResults = new ResultsBag();
for (int i = 0; i < 10; i++) {
fakeInitialResults.add(mock(CqEvent.class));
@@ -83,7 +79,7 @@
.thenReturn(fakeInitialResults);
GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
task.installListenersToRegion(geodeContext, 1, createEventBufferSupplier(eventBuffer),
- "testRegion", DEFAULT_CQ_PREFIX, loadEntireRegion, isDurable);
+ "testRegion", DEFAULT_CQ_PREFIX, false, false);
assertEquals(0, eventBuffer.size());
}
@@ -91,15 +87,13 @@
public void cqListenerOnEventPopulatesEventsBuffer() {
GeodeContext geodeContext = mock(GeodeContext.class);
BlockingQueue<GeodeEvent> eventBuffer = new LinkedBlockingQueue(100);
- boolean loadEntireRegion = false;
- boolean isDurable = false;
when(geodeContext.newCqWithInitialResults(anyString(), anyString(), any(), anyBoolean()))
.thenReturn(mock(CqResults.class));
GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
GeodeKafkaSourceListener listener =
task.installListenersToRegion(geodeContext, 1, createEventBufferSupplier(eventBuffer),
- "testRegion", DEFAULT_CQ_PREFIX, loadEntireRegion, isDurable);
+ "testRegion", DEFAULT_CQ_PREFIX, false, false);
listener.onEvent(mock(CqEvent.class));
assertEquals(1, eventBuffer.size());