Fixing the sink test issue
diff --git a/.gitignore b/.gitignore
index 80b701f..2e856f5 100644
--- a/.gitignore
+++ b/.gitignore
@@ -235,4 +235,4 @@
.nb-gradle/
.idea/
-./dunit/
+**/dunit
diff --git a/dunit/locator/locator63530view.dat b/dunit/locator/locator63530view.dat
deleted file mode 100644
index 10a68c2..0000000
--- a/dunit/locator/locator63530view.dat
+++ /dev/null
Binary files differ
diff --git a/src/test/java/org/geode/kafka/GeodeAsSinkDUnitTest.java b/src/test/java/org/geode/kafka/GeodeAsSinkDUnitTest.java
index 5c94575..b0a11e6 100644
--- a/src/test/java/org/geode/kafka/GeodeAsSinkDUnitTest.java
+++ b/src/test/java/org/geode/kafka/GeodeAsSinkDUnitTest.java
@@ -1,7 +1,6 @@
package org.geode.kafka;
import static org.awaitility.Awaitility.await;
-import static org.geode.kafka.GeodeKafkaTestUtils.createConsumer;
import static org.geode.kafka.GeodeKafkaTestUtils.createProducer;
import static org.geode.kafka.GeodeKafkaTestUtils.createTopic;
import static org.geode.kafka.GeodeKafkaTestUtils.deleteTopic;
@@ -10,15 +9,12 @@
import static org.geode.kafka.GeodeKafkaTestUtils.startKafka;
import static org.geode.kafka.GeodeKafkaTestUtils.startWorkerAndHerderCluster;
import static org.geode.kafka.GeodeKafkaTestUtils.startZooKeeper;
-import static org.geode.kafka.GeodeKafkaTestUtils.verifyEventsAreConsumed;
import static org.junit.Assert.assertEquals;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
-import com.fasterxml.jackson.module.scala.ser.SymbolSerializerModule;
import kafka.zk.KafkaZkClient;
-import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.utils.Time;
@@ -39,7 +35,7 @@
import org.apache.geode.test.dunit.rules.ClusterStartupRule;
import org.apache.geode.test.dunit.rules.MemberVM;
-//@RunWith(Parameterized.class)
+@RunWith(Parameterized.class)
public class GeodeAsSinkDUnitTest {
@Rule
public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(3);
@@ -82,13 +78,13 @@
return Arrays.asList(new Object[][] {{1, 1}, {5, 10}, {15, 10}});
}
- private /*final*/ int numTask = 1;
- private /*final*/ int numPartition = 1;
+ private final int numTask;
+ private final int numPartition;
-// public GeodeAsSinkDUnitTest(int numTask, int numPartition) {
-// this.numTask = numTask;
-// this.numPartition = numPartition;
-// }
+ public GeodeAsSinkDUnitTest(int numTask, int numPartition) {
+ this.numTask = numTask;
+ this.numPartition = numPartition;
+ }
@Test
public void whenKafkaProducerProducesEventsThenGeodeMustReceiveTheseEvents() throws Exception {
@@ -161,5 +157,5 @@
kafkaLocalCluster.stop();
}
- }
+ }
}
diff --git a/src/test/java/org/geode/kafka/GeodeKafkaTestUtils.java b/src/test/java/org/geode/kafka/GeodeKafkaTestUtils.java
index 27f9391..abe0d6a 100644
--- a/src/test/java/org/geode/kafka/GeodeKafkaTestUtils.java
+++ b/src/test/java/org/geode/kafka/GeodeKafkaTestUtils.java
@@ -148,7 +148,6 @@
await().atMost(10, TimeUnit.SECONDS).until(() -> {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2));
for (ConsumerRecord<String, String> record : records) {
- System.out.println("NABA :: " + record);
valueReceived.incrementAndGet();
}
return valueReceived.get() == numEvents;
diff --git a/src/test/java/org/geode/kafka/LocatorLauncherWrapper.java b/src/test/java/org/geode/kafka/LocatorLauncherWrapper.java
index f0a0a1d..f24367c 100644
--- a/src/test/java/org/geode/kafka/LocatorLauncherWrapper.java
+++ b/src/test/java/org/geode/kafka/LocatorLauncherWrapper.java
@@ -14,7 +14,6 @@
*/
package org.geode.kafka;
-import java.io.File;
import java.io.IOException;
import java.util.Properties;
@@ -30,7 +29,7 @@
properties.setProperty(ConfigurationProperties.NAME, "locator1");
Locator.startLocatorAndDS(10334,
- null/*new File("/Users/jhuynh/Pivotal/geode-kafka-connector/locator.log")*/, properties);
+ null/* new File("/Users/jhuynh/Pivotal/geode-kafka-connector/locator.log") */, properties);
while (true) {
}
diff --git a/src/test/java/org/geode/kafka/ServerLauncherWrapper.java b/src/test/java/org/geode/kafka/ServerLauncherWrapper.java
index 026012c..4ab75cd 100644
--- a/src/test/java/org/geode/kafka/ServerLauncherWrapper.java
+++ b/src/test/java/org/geode/kafka/ServerLauncherWrapper.java
@@ -48,8 +48,8 @@
.set(ConfigurationProperties.LOCATORS, locatorString)
.set(ConfigurationProperties.NAME,
"server-1")
-// .set(ConfigurationProperties.LOG_FILE,
-// "/Users/jhuynh/Pivotal/geode-kafka-connector/server.log")
+ // .set(ConfigurationProperties.LOG_FILE,
+ // "/Users/jhuynh/Pivotal/geode-kafka-connector/server.log")
.set(ConfigurationProperties.LOG_LEVEL, "info")
// .set(ConfigurationProperties.STATISTIC_ARCHIVE_FILE, statsFile)
.create();
diff --git a/src/test/java/org/geode/kafka/WorkerAndHerderWrapper.java b/src/test/java/org/geode/kafka/WorkerAndHerderWrapper.java
index 0d8ad40..a3efc23 100644
--- a/src/test/java/org/geode/kafka/WorkerAndHerderWrapper.java
+++ b/src/test/java/org/geode/kafka/WorkerAndHerderWrapper.java
@@ -18,7 +18,6 @@
import static org.geode.kafka.source.GeodeSourceConnectorConfig.REGION_TO_TOPIC_BINDINGS;
import java.io.IOException;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
@@ -43,19 +42,18 @@
String offsetPath = "/tmp/connect.offsets";
String regionToTopicBinding = GeodeKafkaTestCluster.TEST_REGION_TO_TOPIC_BINDINGS;
String topicToRegionBinding = GeodeKafkaTestCluster.TEST_TOPIC_TO_REGION_BINDINGS;
- String testTopicForSink = GeodeKafkaTestCluster.TEST_TOPIC_FOR_SINK;
+ String sinkTopic = GeodeKafkaTestCluster.TEST_TOPIC_FOR_SINK;
String locatorString = null;
System.out.println("MaxTask " + maxTasks);
if (args.length == 7) {
String sourceRegion = args[1];
String sinkRegion = args[2];
String sourceTopic = args[3];
- String sinkTopic = args[4];
+ sinkTopic = args[4];
offsetPath = args[5];
regionToTopicBinding = "[" + sourceRegion + ":" + sourceTopic + "]";
topicToRegionBinding = "[" + sinkTopic + ":" + sinkRegion + "]";
locatorString = args[6];
- System.out.println("NABA args = " + Arrays.deepToString(args));
}
Map props = new HashMap();
@@ -104,8 +102,7 @@
sinkProps.put(ConnectorConfig.TASKS_MAX_CONFIG, maxTasks);
sinkProps.put(TOPIC_TO_REGION_BINDINGS, topicToRegionBinding);
sinkProps.put(GeodeConnectorConfig.LOCATORS, locatorString);
- System.out.println("NABA : binding " + topicToRegionBinding);
- sinkProps.put("topics", testTopicForSink);
+ sinkProps.put("topics", sinkTopic);
herder.putConnectorConfig(
sinkProps.get(ConnectorConfig.NAME_CONFIG),