Converting global variables to local
diff --git a/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java b/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java
index eaf0f66..daf2274 100644
--- a/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java
+++ b/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java
@@ -40,7 +40,6 @@
private static final Logger logger = LoggerFactory.getLogger(GeodeKafkaSinkTask.class);
private GeodeContext geodeContext;
- private int taskId;
private Map<String, List<String>> topicToRegions;
private Map<String, Region> regionNameToRegion;
private boolean nullValuesMeansRemove = true;
@@ -73,7 +72,7 @@
void configure(GeodeSinkConnectorConfig geodeConnectorConfig) {
logger.debug("GeodeKafkaSourceTask id:" + geodeConnectorConfig.getTaskId() + " starting");
- taskId = geodeConnectorConfig.getTaskId();
+ int taskId = geodeConnectorConfig.getTaskId();
topicToRegions = geodeConnectorConfig.getTopicToRegions();
nullValuesMeansRemove = geodeConnectorConfig.getNullValuesMeanRemove();
}
diff --git a/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java
index 4e5b415..2d5abe4 100644
--- a/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java
+++ b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java
@@ -43,7 +43,6 @@
private static final Map<String, Long> OFFSET_DEFAULT = createOffset();
private GeodeContext geodeContext;
- private GeodeSourceConnectorConfig geodeConnectorConfig;
private EventBufferSupplier eventBufferSupplier;
private Map<String, List<String>> regionToTopics;
private Map<String, Map<String, String>> sourcePartitions;
@@ -64,7 +63,7 @@
@Override
public void start(Map<String, String> props) {
try {
- geodeConnectorConfig = new GeodeSourceConnectorConfig(props);
+ GeodeSourceConnectorConfig geodeConnectorConfig = new GeodeSourceConnectorConfig(props);
logger.debug("GeodeKafkaSourceTask id:" + geodeConnectorConfig.getTaskId() + " starting");
geodeContext = new GeodeContext();
geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(),
diff --git a/src/main/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfig.java b/src/main/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfig.java
index ac70051..a004f23 100644
--- a/src/main/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfig.java
+++ b/src/main/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfig.java
@@ -54,7 +54,6 @@
public static final String DEFAULT_LOAD_ENTIRE_REGION = "false";
private final String durableClientId;
- private final String durableClientIdPrefix;
private final String durableClientTimeout;
private final String cqPrefix;
private final boolean loadEntireRegion;
@@ -68,7 +67,7 @@
super(SOURCE_CONFIG_DEF, connectorProperties);
cqsToRegister = parseRegionToTopics(getString(CQS_TO_REGISTER)).keySet();
regionToTopics = parseRegionToTopics(getString(REGION_TO_TOPIC_BINDINGS));
- durableClientIdPrefix = getString(DURABLE_CLIENT_ID_PREFIX);
+ String durableClientIdPrefix = getString(DURABLE_CLIENT_ID_PREFIX);
if (isDurable(durableClientIdPrefix)) {
durableClientId = durableClientIdPrefix + taskId;
} else {
diff --git a/src/test/java/org/apache/geode/kafka/GeodeAsSinkDUnitTest.java b/src/test/java/org/apache/geode/kafka/GeodeAsSinkDUnitTest.java
index 931de80..d8f1ab5 100644
--- a/src/test/java/org/apache/geode/kafka/GeodeAsSinkDUnitTest.java
+++ b/src/test/java/org/apache/geode/kafka/GeodeAsSinkDUnitTest.java
@@ -42,9 +42,6 @@
@Rule
public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(3);
- private static MemberVM locator, server;
- private static ClientVM client;
-
@Rule
public TestName testName = new TestName();
@@ -91,12 +88,11 @@
@Test
public void whenKafkaProducerProducesEventsThenGeodeMustReceiveTheseEvents() throws Exception {
- locator = clusterStartupRule.startLocatorVM(0, 10334);
+ MemberVM locator = clusterStartupRule.startLocatorVM(0, 10334);
int locatorPort = locator.getPort();
- server = clusterStartupRule.startServerVM(1, locatorPort);
- client =
- clusterStartupRule
- .startClientVM(2, client -> client.withLocatorConnection(locatorPort));
+ MemberVM server = clusterStartupRule.startServerVM(1, locatorPort);
+ ClientVM client1 = clusterStartupRule
+ .startClientVM(2, client -> client.withLocatorConnection(locatorPort));
int NUM_EVENT = 10;
// Set unique names for all the different components
@@ -132,7 +128,7 @@
workerAndHerderCluster = startWorkerAndHerderCluster(numTask, sourceRegion, sinkRegion,
sourceTopic, sinkTopic, temporaryFolderForOffset.getRoot().getAbsolutePath(),
"localhost[" + locatorPort + "]");
- client.invoke(() -> {
+ client1.invoke(() -> {
ClusterStartupRule.getClientCache().createClientRegionFactory(ClientRegionShortcut.PROXY)
.create(sinkRegion);
});
@@ -143,7 +139,7 @@
producer.send(new ProducerRecord(sinkTopic, "KEY" + i, "VALUE" + i));
}
- client.invoke(() -> {
+ client1.invoke(() -> {
Region region = ClusterStartupRule.getClientCache().getRegion(sinkRegion);
await().atMost(10, TimeUnit.SECONDS)
.untilAsserted(() -> assertEquals(10, region.sizeOnServer()));
diff --git a/src/test/java/org/apache/geode/kafka/GeodeAsSourceDUnitTest.java b/src/test/java/org/apache/geode/kafka/GeodeAsSourceDUnitTest.java
index 8b41d3e..7a0f05f 100644
--- a/src/test/java/org/apache/geode/kafka/GeodeAsSourceDUnitTest.java
+++ b/src/test/java/org/apache/geode/kafka/GeodeAsSourceDUnitTest.java
@@ -54,9 +54,6 @@
@Rule
public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(3);
- private static MemberVM locator, server;
- private static ClientVM client;
-
@Rule
public TestName testName = new TestName();
@@ -103,12 +100,11 @@
@Test
public void whenDataIsInsertedInGeodeSourceThenKafkaConsumerMustReceiveEvents() throws Exception {
- locator = clusterStartupRule.startLocatorVM(0, 10334);
+ MemberVM locator = clusterStartupRule.startLocatorVM(0, 10334);
int locatorPort = locator.getPort();
- server = clusterStartupRule.startServerVM(1, locatorPort);
- client =
- clusterStartupRule
- .startClientVM(2, client -> client.withLocatorConnection(locatorPort));
+ MemberVM server = clusterStartupRule.startServerVM(1, locatorPort);
+ ClientVM client1 = clusterStartupRule
+ .startClientVM(2, client -> client.withLocatorConnection(locatorPort));
int NUM_EVENT = 10;
// Set unique names for all the different components
@@ -128,7 +124,7 @@
ClusterStartupRule.getCache().createRegionFactory(RegionShortcut.PARTITION)
.create(sinkRegion);
});
- client.invoke(() -> {
+ client1.invoke(() -> {
ClusterStartupRule.getClientCache().createClientRegionFactory(ClientRegionShortcut.PROXY)
.create(sourceRegion);
});
@@ -152,7 +148,7 @@
Consumer<String, String> consumer = createConsumer(sourceTopic);
// Insert data into the Apache Geode source from the client
- client.invoke(() -> {
+ client1.invoke(() -> {
Region region = ClusterStartupRule.getClientCache().getRegion(sourceRegion);
for (int i = 0; i < NUM_EVENT; i++) {
region.put("KEY" + i, "VALUE" + i);