[Edgent-441] adjust kafka test timeouts
diff --git a/connectors/kafka/src/main/java/org/apache/edgent/connectors/kafka/runtime/KafkaConsumerConnector.java b/connectors/kafka/src/main/java/org/apache/edgent/connectors/kafka/runtime/KafkaConsumerConnector.java
index f5f879f..3e280f4 100644
--- a/connectors/kafka/src/main/java/org/apache/edgent/connectors/kafka/runtime/KafkaConsumerConnector.java
+++ b/connectors/kafka/src/main/java/org/apache/edgent/connectors/kafka/runtime/KafkaConsumerConnector.java
@@ -157,6 +157,7 @@
}
synchronized void start(KafkaSubscriber<?> subscriber) {
+ trace.info("{} starting consumer", id());
Map<String,Integer> topicCountMap = new HashMap<>();
int threadsPerTopic = 1;
int totThreadCnt = 0;
@@ -176,9 +177,10 @@
String topic = entry.getKey();
int threadNum = 0;
for (KafkaStream<byte[],byte[]> stream : entry.getValue()) {
+ final int fThreadNum = threadNum++;
executor.submit(() -> {
try {
- trace.info("{} started consumer thread {} for topic:{}", id(), threadNum, topic);
+ trace.info("{} started consumer thread {} for topic:{}", id(), fThreadNum, topic);
ConsumerIterator<byte[],byte[]> it = stream.iterator();
while (it.hasNext()) {
subscriber.accept(it.next());
@@ -193,7 +195,7 @@
trace.error("{} consumer for topic:{}. got exception", id(), topic, t);
}
finally {
- trace.info("{} consumer thread {} for topic:{} exiting.", id(), threadNum, topic);
+ trace.info("{} consumer thread {} for topic:{} exiting.", id(), fThreadNum, topic);
}
});
}
diff --git a/connectors/kafka/src/test/java/org/apache/edgent/test/connectors/kafka/KafkaStreamsTestManual.java b/connectors/kafka/src/test/java/org/apache/edgent/test/connectors/kafka/KafkaStreamsTestManual.java
index 74e5b63..11ec7dd 100644
--- a/connectors/kafka/src/test/java/org/apache/edgent/test/connectors/kafka/KafkaStreamsTestManual.java
+++ b/connectors/kafka/src/test/java/org/apache/edgent/test/connectors/kafka/KafkaStreamsTestManual.java
@@ -80,8 +80,8 @@
* }</pre>
*/
public class KafkaStreamsTestManual extends ConnectorTestBase {
- private static final int PUB_DELAY_MSEC = 4*1000;
- private static final int SEC_TIMEOUT = 10;
+ private static final int PUB_DELAY_MSEC = 15*1000; // have seen 12sec 1st test's consumer startup delay
+ private static final int SEC_TIMEOUT = 20;
private final String BASE_GROUP_ID = "kafkaStreamsTestGroupId";
private final String uniq = simpleTS();
private final String msg1 = "Hello";