[INLONG-7646][Audit] Fix NPE when mq configuration is not registered (#7647)
diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java
index fe722a4..c23c231 100644
--- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java
+++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java
@@ -37,6 +37,7 @@
import org.apache.inlong.audit.service.consume.KafkaConsume;
import org.apache.inlong.audit.service.consume.PulsarConsume;
import org.apache.inlong.audit.service.consume.TubeConsume;
+import org.apache.inlong.common.constant.MQType;
import org.apache.inlong.common.pojo.audit.AuditConfigRequest;
import org.apache.inlong.common.pojo.audit.MQInfo;
import org.slf4j.Logger;
@@ -45,7 +46,6 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
-import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
@@ -70,6 +70,9 @@
private static final String DEFAULT_CONFIG_PROPERTIES = "application.properties";
+ // interval time of getting mq config
+ private static final int INTERVAL_MS = 5000;
+
private final CloseableHttpClient httpClient = HttpClientBuilder.create().build();
private final Gson gson = new Gson();
@@ -83,24 +86,25 @@
List<InsertData> insertServiceList = this.getInsertServiceList();
for (MQInfo mqInfo : mqInfoList) {
- if (mqConfig.isPulsar()) {
+ if (mqConfig.isPulsar() && MQType.PULSAR.equals(mqInfo.getMqType())) {
mqConfig.setPulsarServerUrl(mqInfo.getUrl());
mqConsume = new PulsarConsume(insertServiceList, storeConfig, mqConfig);
break;
- } else if (mqConfig.isTube()) {
+ } else if (mqConfig.isTube() && MQType.TUBEMQ.equals(mqInfo.getMqType())) {
mqConfig.setTubeMasterList(mqInfo.getUrl());
mqConsume = new TubeConsume(insertServiceList, storeConfig, mqConfig);
break;
- } else if (mqConfig.isKafka()) {
+ } else if (mqConfig.isKafka() && MQType.KAFKA.equals(mqInfo.getMqType())) {
mqConfig.setKafkaServerUrl(mqInfo.getUrl());
mqConsume = new KafkaConsume(insertServiceList, storeConfig, mqConfig);
break;
- } else {
- LOG.error("Unknown MessageQueue {}", mqConfig.getMqType());
- return;
}
}
+ if (mqConsume == null) {
+ LOG.error("Unknown MessageQueue {}", mqConfig.getMqType());
+ }
+
if (storeConfig.isElasticsearchStore()) {
esService.startTimerRoutine();
}
@@ -133,19 +137,23 @@
private List<MQInfo> getClusterFromManager() {
Properties properties = new Properties();
+ List<MQInfo> mqConfig;
try (InputStream inputStream = getClass().getClassLoader().getResourceAsStream(DEFAULT_CONFIG_PROPERTIES)) {
properties.load(inputStream);
String managerHosts = properties.getProperty("manager.hosts");
String clusterTag = properties.getProperty("proxy.cluster.tag");
String[] hostList = StringUtils.split(managerHosts, ",");
for (String host : hostList) {
- List<MQInfo> mqConfig = getMQConfig(host, clusterTag);
- if (ObjectUtils.isNotEmpty(mqConfig)) {
- LOG.info("return mqConfig");
- return mqConfig;
+ while (true) {
+ mqConfig = getMQConfig(host, clusterTag);
+ if (ObjectUtils.isNotEmpty(mqConfig)) {
+ return mqConfig;
+ }
+ LOG.info("MQ config may not be registered yet, wait for 5s and try again");
+ Thread.sleep(INTERVAL_MS);
}
}
- } catch (IOException e) {
+ } catch (Exception e) {
throw new RuntimeException(e);
}
return null;